MQTTAsync.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial implementation and documentation
15  * Ian Craggs, Allan Stockdill-Mander - SSL support
16  * Ian Craggs - multiple server connection support
17  * Ian Craggs - fix for bug 413429 - connectionLost not called
18  * Ian Craggs - fix for bug 415042 - using already freed structure
19  * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
20  * Ian Craggs - fix for bug 420851
21  * Ian Craggs - fix for bug 432903 - queue persistence
22  * Ian Craggs - MQTT 3.1.1 support
23  * Rong Xiang, Ian Craggs - C++ compatibility
24  * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
25  * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
26  * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
27  * Ian Craggs - fix for bug 465369 - longer latency than expected
28  * Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
29  * Ian Craggs - fix for bug 484363 - segfault in getReadySocket
30  * Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
31  * Ian Craggs - fix for bug 472250
32  * Ian Craggs - fix for bug 486548
33  * Ian Craggs - SNI support
34  * Ian Craggs - auto reconnect timing fix #218
35  * Ian Craggs - fix for issue #190
36  * Ian Craggs - check for NULL SSL options #334
37  * Ian Craggs - allocate username/password buffers #431
38  * Ian Craggs - MQTT 5.0 support
39  *******************************************************************************/
40 
47 #include <stdlib.h>
48 #include <string.h>
49 #if !defined(_WIN32) && !defined(_WIN64)
50  #include <sys/time.h>
51 #endif
52 
53 #if !defined(NO_PERSISTENCE)
54 #include "MQTTPersistence.h"
55 #endif
56 #include "MQTTAsync.h"
57 #include "utf-8.h"
58 #include "MQTTProtocol.h"
59 #include "MQTTProtocolOut.h"
60 #include "Thread.h"
61 #include "SocketBuffer.h"
62 #include "StackTrace.h"
63 #include "Heap.h"
64 #include "OsWrapper.h"
65 #include "WebSocket.h"
66 
67 #define URI_TCP "tcp://"
68 #define URI_WS "ws://"
69 #define URI_WSS "wss://"
70 
71 #include "VersionInfo.h"
72 
73 const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
74 const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
75 
76 // global objects init declaration
77 int MQTTAsync_init(void);
78 
80 {
82 #if defined(OPENSSL)
84 #endif
85 }
86 
87 #if !defined(min)
88 #define min(a, b) (((a) < (b)) ? (a) : (b))
89 #endif
90 
91 #if defined(WIN32) || defined(WIN64)
92 void MQTTAsync_init_rand(void)
93 {
95  srand(now);
96 }
97 #elif defined(AIX)
98 void MQTTAsync_init_rand(void)
99 {
101  srand(now.tv_nsec);
102 }
103 #else
105 {
107  srand(now.tv_usec);
108 }
109 #endif
110 
112 {
113  CLIENT_VERSION, /* version */
114  NULL /* client list */
115 };
116 
118 
120 
122 {
124 };
125 
130 
131 #if defined(_WIN32) || defined(_WIN64)
132 static mutex_type mqttasync_mutex = NULL;
133 static mutex_type socket_mutex = NULL;
134 static mutex_type mqttcommand_mutex = NULL;
135 static sem_type send_sem = NULL;
136 #if !defined(NO_HEAP_TRACKING)
137 extern mutex_type stack_mutex;
138 extern mutex_type heap_mutex;
139 #endif
140 extern mutex_type log_mutex;
141 
142 int MQTTAsync_init(void)
143 {
144  DWORD rc = 0;
145 
146  if (mqttasync_mutex == NULL)
147  {
148  if ((mqttasync_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
149  {
150  rc = GetLastError();
151  printf("mqttasync_mutex error %d\n", rc);
152  goto exit;
153  }
154  if ((mqttcommand_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
155  {
156  rc = GetLastError();
157  printf("mqttcommand_mutex error %d\n", rc);
158  goto exit;
159  }
160  if ((send_sem = CreateEvent(
161  NULL, /* default security attributes */
162  FALSE, /* manual-reset event? */
163  FALSE, /* initial state is nonsignaled */
164  NULL /* object name */
165  )) == NULL)
166  {
167  rc = GetLastError();
168  printf("send_sem error %d\n", rc);
169  goto exit;
170  }
171 #if !defined(NO_HEAP_TRACKING)
172  if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
173  {
174  rc = GetLastError();
175  printf("stack_mutex error %d\n", rc);
176  goto exit;
177  }
178  if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
179  {
180  rc = GetLastError();
181  printf("heap_mutex error %d\n", rc);
182  goto exit;
183  }
184 #endif
185  if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
186  {
187  rc = GetLastError();
188  printf("log_mutex error %d\n", rc);
189  goto exit;
190  }
191  if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
192  {
193  rc = GetLastError();
194  printf("socket_mutex error %d\n", rc);
195  goto exit;
196  }
197  }
198  else
199  {
200  Log(TRACE_MAX, -1, "Library already initialized");
201  }
202 exit:
203  return rc;
204 }
205 
206 void MQTTAsync_cleanup(void)
207 {
208  if (send_sem)
209  CloseHandle(send_sem);
210 #if !defined(NO_HEAP_TRACKING)
211  if (stack_mutex)
212  CloseHandle(stack_mutex);
213  if (heap_mutex)
214  CloseHandle(heap_mutex);
215 #endif
216  if (log_mutex)
217  CloseHandle(log_mutex);
218  if (socket_mutex)
219  CloseHandle(socket_mutex);
220  if (mqttasync_mutex)
221  CloseHandle(mqttasync_mutex);
222 }
223 
224 #if defined(PAHO_MQTT_STATIC)
225 static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Global for one time initialization */
226 
227 /* This runs at most once */
228 BOOL CALLBACK InitMutexesOnce (
229  PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
230  PVOID Parameter, /* Optional parameter */
231  PVOID *lpContext) /* Return data, if any */
232 {
233  int rc = MQTTAsync_init();
234  return rc == 0;
235 }
236 #else
237 BOOL APIENTRY DllMain(HANDLE hModule,
238  DWORD ul_reason_for_call,
239  LPVOID lpReserved)
240 {
241  switch (ul_reason_for_call)
242  {
243  case DLL_PROCESS_ATTACH:
244  MQTTAsync_init();
245  break;
246  case DLL_THREAD_ATTACH:
247  break;
248  case DLL_THREAD_DETACH:
249  break;
250  case DLL_PROCESS_DETACH:
251  if (lpReserved)
252  MQTTAsync_cleanup();
253  break;
254  }
255  return TRUE;
256 }
257 #endif
258 
259 
260 #else
261 static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
263 
264 static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
266 
267 static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
269 
270 static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
272 
273 int MQTTAsync_init(void)
274 {
275  pthread_mutexattr_t attr;
276  int rc;
277 
278  pthread_mutexattr_init(&attr);
279 #if !defined(_WRS_KERNEL)
280  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
281 #else
282  /* #warning "no pthread_mutexattr_settype" */
283 #endif
284  if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
285  printf("MQTTAsync: error %d initializing async_mutex\n", rc);
286  else if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
287  printf("MQTTAsync: error %d initializing command_mutex\n", rc);
288  else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
289  printf("MQTTClient: error %d initializing socket_mutex\n", rc);
290  else if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
291  printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
292  else if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
293  printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
294 
295  return rc;
296 }
297 
298 #define WINAPI
299 #endif
300 
301 static volatile int global_initialized = 0;
302 static List* handles = NULL;
303 static int tostop = 0;
304 static List* commands = NULL;
305 
306 typedef struct
307 {
309  char* topicName;
310  int topicLen;
311  unsigned int seqno; /* only used on restore */
312 } qEntry;
313 
314 typedef struct
315 {
316  int type;
322  void* context;
325  union
326  {
327  struct
328  {
329  int count;
330  char** topics;
331  int* qoss;
334  } sub;
335  struct
336  {
337  int count;
338  char** topics;
339  } unsub;
340  struct
341  {
344  void* payload;
345  int qos;
346  int retained;
347  } pub;
348  struct
349  {
350  int internal;
351  int timeout;
352  enum MQTTReasonCodes reasonCode;
353  } dis;
354  struct
355  {
358  } conn;
359  } details;
361 
362 
363 typedef struct MQTTAsync_struct
364 {
365  char* serverURI;
366  int ssl;
369 
370  /* "Global", to the client, callback definitions */
374  void* clContext; /* the context to be associated with the conn lost callback*/
375  void* maContext; /* the context to be associated with the msg arrived callback*/
376  void* dcContext; /* the context to be associated with the deliv complete callback*/
377 
379  void* connected_context; /* the context to be associated with the connected callback*/
380 
382  void* disconnected_context; /* the context to be associated with the disconnected callback*/
383 
386 
387  /* Each time connect is called, we store the options that were used. These are reused in
388  any call to reconnect, or an automatic reconnect attempt */
389  MQTTAsync_command connect; /* Connect operation properties */
390  MQTTAsync_command disconnect; /* Disconnect operation properties */
391  MQTTAsync_command* pending_write; /* Is there a socket write pending? */
392 
394  unsigned int command_seqno;
395 
397 
398  /* added for offline buffering */
401  int noBufferedMessages; /* the current number of buffered (publish) messages for this client */
402 
403  /* added for automatic reconnect */
408  char** serverURIs;
410 
414  int retrying;
416 
417  /* MQTT V5 properties */
420 
421 } MQTTAsyncs;
422 
423 
424 typedef struct
425 {
428  unsigned int seqno; /* only used on restore */
430  char* key; /* if not_restored, this holds the key */
432 
433 
434 static int clientSockCompare(void* a, void* b);
435 static void MQTTAsync_lock_mutex(mutex_type amutex);
436 static void MQTTAsync_unlock_mutex(mutex_type amutex);
438 static void MQTTAsync_terminate(void);
439 #if !defined(NO_PERSISTENCE)
442 static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen, int MQTTVersion, MQTTAsync_queuedCommand*);
443 /*static void MQTTAsync_insertInOrder(List* list, void* content, int size);*/
445 #endif
446 static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size);
448 static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command);
449 static void MQTTProtocol_checkPendingWrites(void);
450 static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
451 static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
452 static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
453 static void MQTTAsync_writeComplete(int socket, int rc);
454 static int MQTTAsync_processCommand(void);
455 static void MQTTAsync_checkTimeouts(void);
458 static void MQTTAsync_freeResponses(MQTTAsyncs* m);
459 static void MQTTAsync_freeCommands(MQTTAsyncs* m);
461 static int MQTTAsync_completeConnection(MQTTAsyncs* m, Connack* connack);
463 static void MQTTAsync_stop(void);
464 static void MQTTAsync_closeOnly(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props);
466 static int clientStructCompare(void* a, void* b);
468 static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm);
469 static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal);
470 static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
471 static int cmdMessageIDCompare(void* a, void* b);
472 static int MQTTAsync_assignMsgId(MQTTAsyncs* m);
474 static void MQTTAsync_retry(void);
475 static int MQTTAsync_connecting(MQTTAsyncs* m);
476 static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc);
477 /*static int pubCompare(void* a, void* b);*/
478 
479 
480 void MQTTAsync_sleep(long milliseconds)
481 {
482  FUNC_ENTRY;
483 #if defined(_WIN32) || defined(_WIN64)
484  Sleep(milliseconds);
485 #else
486  usleep(milliseconds*1000);
487 #endif
488  FUNC_EXIT;
489 }
490 
491 
492 /* Add random amount of jitter for exponential backoff on retry
493  Jitter value will be +/- 20% of "base" interval, including max interval
494  https://www.awsarchitectureblog.com/2015/03/backoff.html
495  http://ee.lbl.gov/papers/sync_94.pdf */
496 int MQTTAsync_randomJitter(int currentIntervalBase, int minInterval, int maxInterval)
497 {
498  const int max_sleep = (int)(min(maxInterval, currentIntervalBase) * 1.2); // (e.g. 72 if base > 60)
499  const int min_sleep = (int)(max(minInterval, currentIntervalBase) / 1.2); // (e.g. 48 if base > 60)
500 
501  if (min_sleep >= max_sleep) // shouldn't happen, but just in case
502  {
503  return min_sleep;
504  }
505 
506  {
507  /* random_between(min_sleep, max_sleep)
508  http://stackoverflow.com/questions/2509679/how-to-generate-a-random-number-from-within-a-range */
509  int r;
510  int range = max_sleep - min_sleep + 1;
511  const int buckets = RAND_MAX / range;
512  const int limit = buckets * range;
513 
514  /* Create equal size buckets all in a row, then fire randomly towards
515  * the buckets until you land in one of them. All buckets are equally
516  * likely. If you land off the end of the line of buckets, try again. */
517  do
518  {
519  r = rand();
520  } while (r >= limit);
521 
522  {
523  const int randResult = r / buckets;
524  return min_sleep + randResult;
525  }
526  }
527 }
528 
529 
536 static int clientSockCompare(void* a, void* b)
537 {
538  MQTTAsyncs* m = (MQTTAsyncs*)a;
539  return m->c->net.socket == *(int*)b;
540 }
541 
542 
543 static void MQTTAsync_lock_mutex(mutex_type amutex)
544 {
545  int rc = Thread_lock_mutex(amutex);
546  if (rc != 0)
547  Log(LOG_ERROR, 0, "Error %s locking mutex", strerror(rc));
548 }
549 
550 
552 {
553  int rc = Thread_unlock_mutex(amutex);
554  if (rc != 0)
555  Log(LOG_ERROR, 0, "Error %s unlocking mutex", strerror(rc));
556 }
557 
558 
559 /*
560  Check whether there are any more connect options. If not then we are finished
561  with connect attempts.
562 */
564 {
565  int rc;
566 
567  FUNC_ENTRY;
568  rc = command->details.conn.currentURI + 1 < client->serverURIcount ||
569  (command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT);
570  FUNC_EXIT_RC(rc);
571  return rc;
572 }
573 
574 
575 int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
576  int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
577 {
578  int rc = 0;
579  MQTTAsyncs *m = NULL;
580 
581 #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
582  /* intializes mutexes once. Must come before FUNC_ENTRY */
583  BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitMutexesOnce, NULL, NULL);
584 #endif
585  FUNC_ENTRY;
587 
588  if (serverURI == NULL || clientId == NULL)
589  {
591  goto exit;
592  }
593 
594  if (!UTF8_validateString(clientId))
595  {
597  goto exit;
598  }
599 
600  if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
601  {
603  goto exit;
604  }
605 
606  if (strstr(serverURI, "://") != NULL)
607  {
608  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
609  && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
610 #if defined(OPENSSL)
611  && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
612  && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
613 #endif
614  )
615  {
617  goto exit;
618  }
619  }
620 
621  if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 ||
622  options->struct_version < 0 || options->struct_version > 2))
623  {
625  goto exit;
626  }
627 
628  if (!global_initialized)
629  {
630  #if !defined(NO_HEAP_TRACKING)
631  Heap_initialize();
632  #endif
634  bstate->clients = ListInitialize();
637  handles = ListInitialize();
638  commands = ListInitialize();
639 #if defined(OPENSSL)
641 #endif
642  global_initialized = 1;
643  }
644  if ((m = malloc(sizeof(MQTTAsyncs))) == NULL)
645  {
646  rc = PAHO_MEMORY_ERROR;
647  goto exit;
648  }
649  *handle = m;
650  memset(m, '\0', sizeof(MQTTAsyncs));
651  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
652  serverURI += strlen(URI_TCP);
653  else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
654  {
655  serverURI += strlen(URI_WS);
656  m->websocket = 1;
657  }
658 #if defined(OPENSSL)
659  else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
660  {
661  serverURI += strlen(URI_SSL);
662  m->ssl = 1;
663  }
664  else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
665  {
666  serverURI += strlen(URI_WSS);
667  m->ssl = 1;
668  m->websocket = 1;
669  }
670 #endif
671  if ((m->serverURI = MQTTStrdup(serverURI)) == NULL)
672  {
673  rc = PAHO_MEMORY_ERROR;
674  goto exit;
675  }
676  m->responses = ListInitialize();
677  ListAppend(handles, m, sizeof(MQTTAsyncs));
678 
679  if ((m->c = malloc(sizeof(Clients))) == NULL)
680  {
681  rc = PAHO_MEMORY_ERROR;
682  goto exit;
683  }
684  memset(m->c, '\0', sizeof(Clients));
685  m->c->context = m;
686  m->c->outboundMsgs = ListInitialize();
687  m->c->inboundMsgs = ListInitialize();
688  m->c->messageQueue = ListInitialize();
689  m->c->clientID = MQTTStrdup(clientId);
690  if (m->c->context == NULL || m->c->outboundMsgs == NULL || m->c->inboundMsgs == NULL ||
691  m->c->messageQueue == NULL || m->c->clientID == NULL)
692  {
693  rc = PAHO_MEMORY_ERROR;
694  goto exit;
695  }
697 
698  m->shouldBeConnected = 0;
699  if (options)
700  {
701  if ((m->createOptions = malloc(sizeof(MQTTAsync_createOptions))) == NULL)
702  {
703  rc = PAHO_MEMORY_ERROR;
704  goto exit;
705  }
706  memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
707  if (options->struct_version > 0)
708  m->c->MQTTVersion = options->MQTTVersion;
709  }
710 
711 #if !defined(NO_PERSISTENCE)
712  rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
713  if (rc == 0)
714  {
715  rc = MQTTPersistence_initialize(m->c, m->serverURI); /* inflight messages restored here */
716  if (rc == 0)
717  {
720  else
721  {
724  }
725  }
726  }
727 #endif
728  ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
729 
730 exit:
732  FUNC_EXIT_RC(rc);
733  return rc;
734 }
735 
736 
737 int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
738  int persistence_type, void* persistence_context)
739 {
741 
742  return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
743  persistence_context, NULL);
744 }
745 
746 
747 static void MQTTAsync_terminate(void)
748 {
749  FUNC_ENTRY;
750  MQTTAsync_stop();
751  if (global_initialized)
752  {
753  ListElement* elem = NULL;
754  ListFree(bstate->clients);
755  ListFree(handles);
756  while (ListNextElement(commands, &elem))
758  ListFree(commands);
759  handles = NULL;
761  #if !defined(NO_HEAP_TRACKING)
762  Heap_terminate();
763  #endif
764  Log_terminate();
765  global_initialized = 0;
766  }
767  FUNC_EXIT;
768 }
769 
770 
771 #if !defined(NO_PERSISTENCE)
773 {
774  int rc = 0;
775  char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
776 
777  FUNC_ENTRY;
778  if (qcmd->client->c->MQTTVersion >= MQTTVERSION_5)
779  sprintf(key, "%s%u", PERSISTENCE_V5_COMMAND_KEY, qcmd->seqno);
780  else
781  sprintf(key, "%s%u", PERSISTENCE_COMMAND_KEY, qcmd->seqno);
782  if ((rc = qcmd->client->c->persistence->premove(qcmd->client->c->phandle, key)) != 0)
783  Log(LOG_ERROR, 0, "Error %d removing command from persistence", rc);
784  FUNC_EXIT_RC(rc);
785  return rc;
786 }
787 
788 
790 {
791  int rc = 0;
792  MQTTAsyncs* aclient = qcmd->client;
794  int* lens = NULL;
795  void** bufs = NULL;
796  int bufindex = 0, i, nbufs = 0;
797  char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
798  int props_allocated = 0;
799  int process = 1;
800 
801  FUNC_ENTRY;
802  switch (command->type)
803  {
804  case SUBSCRIBE:
805  nbufs = ((aclient->c->MQTTVersion >= MQTTVERSION_5) ? 4 : 3) +
806  (command->details.sub.count * 2);
807 
808  if (((lens = (int*)malloc(nbufs * sizeof(int))) == NULL) ||
809  ((bufs = malloc(nbufs * sizeof(char *))) == NULL))
810  {
811  rc = PAHO_MEMORY_ERROR;
812  goto exit;
813  }
814  bufs[bufindex] = &command->type;
815  lens[bufindex++] = sizeof(command->type);
816 
817  bufs[bufindex] = &command->token;
818  lens[bufindex++] = sizeof(command->token);
819 
820  bufs[bufindex] = &command->details.sub.count;
821  lens[bufindex++] = sizeof(command->details.sub.count);
822 
823  for (i = 0; i < command->details.sub.count; ++i)
824  {
825  bufs[bufindex] = command->details.sub.topics[i];
826  lens[bufindex++] = (int)strlen(command->details.sub.topics[i]) + 1;
827 
828  if (aclient->c->MQTTVersion < MQTTVERSION_5)
829  {
830  bufs[bufindex] = &command->details.sub.qoss[i];
831  lens[bufindex++] = sizeof(command->details.sub.qoss[i]);
832  }
833  else
834  {
835  if (command->details.sub.count == 1)
836  {
837  bufs[bufindex] = &command->details.sub.opts;
838  lens[bufindex++] = sizeof(command->details.sub.opts);
839  }
840  else
841  {
842  bufs[bufindex] = &command->details.sub.optlist[i];
843  lens[bufindex++] = sizeof(command->details.sub.optlist[i]);
844  }
845  }
846  }
847  break;
848 
849  case UNSUBSCRIBE:
850  nbufs = ((aclient->c->MQTTVersion >= MQTTVERSION_5) ? 4 : 3) +
851  command->details.unsub.count;
852 
853  if (((lens = (int*)malloc(nbufs * sizeof(int))) == NULL) ||
854  ((bufs = malloc(nbufs * sizeof(char *))) == NULL))
855  {
856  rc = PAHO_MEMORY_ERROR;
857  goto exit;
858  }
859 
860  bufs[bufindex] = &command->type;
861  lens[bufindex++] = sizeof(command->type);
862 
863  bufs[bufindex] = &command->token;
864  lens[bufindex++] = sizeof(command->token);
865 
866  bufs[bufindex] = &command->details.unsub.count;
867  lens[bufindex++] = sizeof(command->details.unsub.count);
868 
869  for (i = 0; i < command->details.unsub.count; ++i)
870  {
871  bufs[bufindex] = command->details.unsub.topics[i];
872  lens[bufindex++] = (int)strlen(command->details.unsub.topics[i]) + 1;
873  }
874  break;
875 
876  case PUBLISH:
877  nbufs = (aclient->c->MQTTVersion >= MQTTVERSION_5) ? 8 : 7;
878 
879  if (((lens = (int*)malloc(nbufs * sizeof(int))) == NULL) ||
880  ((bufs = malloc(nbufs * sizeof(char *))) == NULL))
881  {
882  rc = PAHO_MEMORY_ERROR;
883  goto exit;
884  }
885 
886  bufs[bufindex] = &command->type;
887  lens[bufindex++] = sizeof(command->type);
888 
889  bufs[bufindex] = &command->token;
890  lens[bufindex++] = sizeof(command->token);
891 
892  bufs[bufindex] = command->details.pub.destinationName;
893  lens[bufindex++] = (int)strlen(command->details.pub.destinationName) + 1;
894 
895  bufs[bufindex] = &command->details.pub.payloadlen;
896  lens[bufindex++] = sizeof(command->details.pub.payloadlen);
897 
898  bufs[bufindex] = command->details.pub.payload;
899  lens[bufindex++] = command->details.pub.payloadlen;
900 
901  bufs[bufindex] = &command->details.pub.qos;
902  lens[bufindex++] = sizeof(command->details.pub.qos);
903 
904  bufs[bufindex] = &command->details.pub.retained;
905  lens[bufindex++] = sizeof(command->details.pub.retained);
906  break;
907 
908  default:
909  process = 0;
910  break;
911  }
912 
913  /*
914  * Increment the command sequence number. Don't exceed the maximum value allowed
915  * by the value PERSISTENCE_MAX_KEY_LENGTH minus the max prefix string length
916  */
917  if (++aclient->command_seqno == PERSISTENCE_SEQNO_LIMIT)
918  aclient->command_seqno = 0;
919 
920 
921  if (aclient->c->MQTTVersion >= MQTTVERSION_5 && process) /* persist properties */
922  {
923  int temp_len = 0;
924  char* ptr = NULL;
925 
926  temp_len = MQTTProperties_len(&command->properties);
927  if ((ptr = bufs[bufindex] = malloc(temp_len)) == NULL)
928  {
929  rc = PAHO_MEMORY_ERROR;
930  goto exit;
931  }
932  props_allocated = bufindex;
933  rc = MQTTProperties_write(&ptr, &command->properties);
934  lens[bufindex++] = temp_len;
935  sprintf(key, "%s%u", PERSISTENCE_V5_COMMAND_KEY, aclient->command_seqno);
936  }
937  else
938  sprintf(key, "%s%u", PERSISTENCE_COMMAND_KEY, aclient->command_seqno);
939 
940  if (nbufs > 0)
941  {
942  if (aclient->c->beforeWrite)
943  rc = aclient->c->beforeWrite(aclient->c->beforeWrite_context, nbufs, (char**)bufs, lens);
944 
945  if ((rc = aclient->c->persistence->pput(aclient->c->phandle, key, nbufs, (char**)bufs, lens)) != 0)
946  Log(LOG_ERROR, 0, "Error persisting command, rc %d", rc);
947  qcmd->seqno = aclient->command_seqno;
948  }
949 exit:
950  if (props_allocated > 0)
951  free(bufs[props_allocated]);
952  if (lens)
953  free(lens);
954  if (bufs)
955  free(bufs);
956  FUNC_EXIT_RC(rc);
957  return rc;
958 }
959 
960 
961 static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen, int MQTTVersion, MQTTAsync_queuedCommand* qcommand)
962 {
963  MQTTAsync_command* command = NULL;
964  char* ptr = buffer;
965  int i;
966  size_t data_size;
967 
968  FUNC_ENTRY;
969  if (qcommand == NULL)
970  {
971  if ((qcommand = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
972  goto exit;
973  memset(qcommand, '\0', sizeof(MQTTAsync_queuedCommand));
974  qcommand->not_restored = 1; /* don't restore all the command on the first call */
975  }
976  else
977  qcommand->not_restored = 0;
978 
979  command = &qcommand->command;
980 
981  command->type = *(int*)ptr;
982  ptr += sizeof(int);
983 
984  command->token = *(MQTTAsync_token*)ptr;
985  ptr += sizeof(MQTTAsync_token);
986 
987  switch (command->type)
988  {
989  case SUBSCRIBE:
990  if (qcommand->not_restored == 0)
991  break;
992  command->details.sub.count = *(int*)ptr;
993  ptr += sizeof(int);
994 
995  if (command->details.sub.count > 0)
996  {
997  if ((command->details.sub.topics = (char **)malloc(sizeof(char *) * command->details.sub.count)) == NULL)
998  {
999  free(qcommand);
1000  qcommand = NULL;
1001  goto exit;
1002  }
1003  if (MQTTVersion < MQTTVERSION_5)
1004  {
1005  if ((command->details.sub.qoss = (int *)malloc(sizeof(int) * command->details.sub.count)) == NULL)
1006  {
1007  free(qcommand);
1008  qcommand = NULL;
1009  goto exit;
1010  }
1011  }
1012  else if (command->details.sub.count > 1)
1013  {
1014  command->details.sub.optlist = (MQTTSubscribe_options*)malloc(sizeof(MQTTSubscribe_options) * command->details.sub.count);
1015  if (command->details.sub.optlist == NULL)
1016  {
1017  free(qcommand);
1018  qcommand = NULL;
1019  goto exit;
1020  }
1021  }
1022  }
1023 
1024  for (i = 0; i < command->details.sub.count; ++i)
1025  {
1026  data_size = strlen(ptr) + 1;
1027 
1028  if ((command->details.sub.topics[i] = malloc(data_size)) == NULL)
1029  {
1030  free(qcommand);
1031  qcommand = NULL;
1032  goto exit;
1033  }
1034  strcpy(command->details.sub.topics[i], ptr);
1035  ptr += data_size;
1036 
1037  if (MQTTVersion < MQTTVERSION_5)
1038  {
1039  command->details.sub.qoss[i] = *(int*)ptr;
1040  ptr += sizeof(int);
1041  }
1042  else
1043  {
1044  if (command->details.sub.count == 1)
1045  {
1046  command->details.sub.opts = *(MQTTSubscribe_options*)ptr;
1047  ptr += sizeof(MQTTSubscribe_options);
1048  }
1049  else
1050  {
1051  command->details.sub.optlist[i] = *(MQTTSubscribe_options*)ptr;
1052  ptr += sizeof(MQTTSubscribe_options);
1053  }
1054  }
1055  }
1056  break;
1057 
1058  case UNSUBSCRIBE:
1059  if (qcommand->not_restored == 0)
1060  break;
1061  command->details.unsub.count = *(int*)ptr;
1062  ptr += sizeof(int);
1063 
1064  if (command->details.unsub.count > 0)
1065  {
1066  command->details.unsub.topics = (char **)malloc(sizeof(char *) * command->details.unsub.count);
1067  if (command->details.unsub.topics == NULL)
1068  {
1069  free(qcommand);
1070  qcommand = NULL;
1071  goto exit;
1072  }
1073  }
1074 
1075  for (i = 0; i < command->details.unsub.count; ++i)
1076  {
1077  data_size = strlen(ptr) + 1;
1078 
1079  if ((command->details.unsub.topics[i] = malloc(data_size)) == NULL)
1080  {
1081  free(qcommand);
1082  qcommand = NULL;
1083  goto exit;
1084  }
1085  strcpy(command->details.unsub.topics[i], ptr);
1086  ptr += data_size;
1087  }
1088  break;
1089 
1090  case PUBLISH:
1091  data_size = strlen(ptr) + 1;
1092  if (qcommand->not_restored == 0)
1093  {
1094  if ((command->details.pub.destinationName = malloc(data_size)) == NULL)
1095  {
1096  free(qcommand);
1097  qcommand = NULL;
1098  goto exit;
1099  }
1100  strcpy(command->details.pub.destinationName, ptr);
1101  }
1102  ptr += data_size;
1103 
1104  command->details.pub.payloadlen = *(int*)ptr;
1105  ptr += sizeof(int);
1106 
1107  data_size = command->details.pub.payloadlen;
1108  if (qcommand->not_restored == 0)
1109  {
1110  if ((command->details.pub.payload = malloc(data_size)) == NULL)
1111  {
1112  free(qcommand);
1113  qcommand = NULL;
1114  goto exit;
1115  }
1116  memcpy(command->details.pub.payload, ptr, data_size);
1117  }
1118  ptr += data_size;
1119 
1120  command->details.pub.qos = *(int*)ptr;
1121  ptr += sizeof(int);
1122 
1123  command->details.pub.retained = *(int*)ptr;
1124  ptr += sizeof(int);
1125  break;
1126 
1127  default:
1128  free(qcommand);
1129  qcommand = NULL;
1130 
1131  }
1132  if (qcommand != NULL && qcommand->not_restored == 0 && MQTTVersion >= MQTTVERSION_5 &&
1133  MQTTProperties_read(&command->properties, &ptr, buffer + buflen) != 1)
1134  {
1135  Log(LOG_ERROR, -1, "Error restoring properties from persistence");
1136  free(qcommand);
1137  qcommand = NULL;
1138  }
1139 exit:
1140  FUNC_EXIT;
1141  return qcommand;
1142 }
1143 
1144 struct keyloc {
1145  int seqno;
1147 };
1148 
1155 static void MQTTAsync_insertInOrder(List* list, void* content, int size, struct keyloc* keyloc_array, int array_size)
1156 {
1157  ListElement* insert_point = NULL;
1158  ListElement* inserted = NULL;
1159  int seqno = ((MQTTAsync_queuedCommand*)content)->seqno;
1160  int low = 0;
1161 
1162  FUNC_ENTRY;
1163  /*printf("\nseqno %d array_size %d\n", seqno, array_size);
1164 
1165  int i;
1166  for (i = 0; i < array_size; ++i)
1167  printf("%d ", keyloc_array[i].seqno);
1168  printf("\n"); */
1169 
1170  /* look through keyloc array to find location to insert message - binary search by sequence number */
1171  if (array_size > 0 && seqno > keyloc_array[array_size - 1].seqno)
1172  {
1173  low = array_size - 1;
1174  }
1175  else if (array_size > 0)
1176  {
1177  int high = array_size - 1;
1178  int divide_index = array_size / 2;
1179 
1180  //printf("divide index %d\n", divide_index);
1181  //int count = 0;
1182  while (high - low > 1)
1183  {
1184  if (seqno < keyloc_array[divide_index].seqno)
1185  high = divide_index;
1186  else
1187  low = divide_index;
1188 
1189  divide_index = (high - low) / 2 + low;
1190  /*printf("high %d low %d divide_index %d\n", high, low, divide_index);
1191  if (++count == 10)
1192  exit(99);*/
1193  }
1194  }
1195  //printf("low %d\n", low);
1196 
1197  insert_point = keyloc_array[low].elem;
1198  if (insert_point)
1199  insert_point = insert_point->next;
1200 
1201  //if (insert_point)
1202  // printf("insert after %d %d\n", keyloc_array[low].seqno, ((MQTTAsync_queuedCommand*)insert_point->content)->seqno);
1203  inserted = ListInsert(list, content, size, insert_point);
1204 
1205  if (array_size > 0 && low + 1 < array_size)
1206  {
1207  /* make space for new entry in array after low point*/
1208  //printf("array_size - low %d\n", array_size - low);
1209  memmove(&keyloc_array[low+2], &keyloc_array[low+1], (array_size - low - 1) * sizeof(struct keyloc));
1210  }
1211  keyloc_array[low+1].seqno = seqno;
1212  keyloc_array[low+1].elem = inserted;
1213 
1214  /*if (array_size > 50)
1215  exit(99);*/
1216 
1217  FUNC_EXIT;
1218 }
1219 
1220 static int cmpkeys(const void *p1, const void *p2)
1221 {
1222  int key1 = atoi(strchr(*(char * const *)p1, '-') + 1);
1223  int key2 = atoi(strchr(*(char * const *)p2, '-') + 1);
1224 
1225  return (key1 == key2) ? 0 : ((key1 < key2) ? -1 : 1);
1226 }
1227 
1228 
1230 {
1231  int rc = 0;
1232  char **msgkeys;
1233  int nkeys;
1234  int i = 0;
1235  Clients* c = client->c;
1236  int commands_restored = 0;
1237 
1238  FUNC_ENTRY;
1239  if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0 && nkeys > 0)
1240  {
1241  MQTTAsync_queuedCommand* sentinel = NULL;
1242  /* keep track of location of nkeys key locations for fast insert */
1243  struct keyloc* keyloc_array = malloc(sizeof(struct keyloc) * (nkeys + 1));
1244 
1245  /* let's have the sequence number array sorted */
1246  qsort(msgkeys, (size_t)nkeys, sizeof(char*), cmpkeys);
1247  /*for (i = 0; i < nkeys; ++i) printf("%s ", msgkeys[i]); printf("\n");*/
1248  if (keyloc_array == NULL)
1249  {
1250  rc = PAHO_MEMORY_ERROR;
1251  goto exit;
1252  }
1253  sentinel = malloc(sizeof(MQTTAsync_queuedCommand));
1254  if (sentinel == NULL)
1255  {
1256  free(keyloc_array);
1257  rc = PAHO_MEMORY_ERROR;
1258  goto exit;
1259  }
1260  sentinel->seqno = -1;
1261  keyloc_array[0].seqno = -1;
1262  keyloc_array[0].elem = ListAppend(commands, sentinel, sizeof(MQTTAsync_queuedCommand));
1263 
1264  while (rc == 0 && i < nkeys)
1265  {
1266  char *buffer = NULL;
1267  int buflen;
1268 
1269  if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) != 0 &&
1270  strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) != 0)
1271  {
1272  ;
1273  }
1274  else
1275  {
1276  MQTTAsync_queuedCommand* cmd = NULL;
1277  if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
1278  (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
1279  {
1280  int MQTTVersion = (strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
1282  cmd = MQTTAsync_restoreCommand(buffer, buflen, MQTTVersion, NULL);
1283  /* As the entire command is not restored on the first read to save memory, we temporarily store
1284  * the filename of the persisted command to be used when restoreCommand is called the second time.
1285  */
1286  cmd->key = malloc(strlen(msgkeys[i])+1);
1287  strcpy(cmd->key, msgkeys[i]);
1288  }
1289 
1290  if (cmd)
1291  {
1292  cmd->client = client;
1293  cmd->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
1294  MQTTAsync_insertInOrder(commands, cmd, sizeof(MQTTAsync_queuedCommand), keyloc_array, i + 1);
1295  if (buffer)
1296  free(buffer);
1297  client->command_seqno = max(client->command_seqno, cmd->seqno);
1298  commands_restored++;
1299  if (cmd->command.type == PUBLISH)
1300  client->noBufferedMessages++;
1301  }
1302  }
1303  if (msgkeys[i])
1304  free(msgkeys[i]);
1305  i++;
1306  }
1307  if (msgkeys != NULL)
1308  free(msgkeys);
1309 
1310  /*int j; for (j = 0; j < i + 1; ++j) printf("%d ", keyloc_array[j].seqno); printf("\n"); */
1311  ListRemoveHead(commands); /* remove sentinel */
1312  /*ListElement* pos = NULL; while (ListNextElement(commands, &pos)) printf("%d ", ((MQTTAsync_queuedCommand*)(pos->content))->seqno); printf("\n");*/
1313 
1314  free(keyloc_array);
1315  }
1316 exit:
1317  Log(TRACE_MINIMUM, -1, "%d commands restored for client %s", commands_restored, c->clientID);
1318  FUNC_EXIT_RC(rc);
1319  return rc;
1320 }
1321 
1322 
1324 {
1325  int rc = 0;
1326  char **msgkeys;
1327  int nkeys;
1328  int i = 0;
1329  int messages_deleted = 0;
1330 
1331  FUNC_ENTRY;
1332  if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
1333  {
1334  while (rc == 0 && i < nkeys)
1335  {
1336  if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
1337  strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0 ||
1338  strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
1339  strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
1340  {
1341  if ((rc = c->persistence->premove(c->phandle, msgkeys[i])) == 0)
1342  messages_deleted++;
1343  else
1344  Log(LOG_ERROR, 0, "Error %d removing queued message from persistence", rc);
1345  }
1346  if (msgkeys[i])
1347  free(msgkeys[i]);
1348  i++;
1349  }
1350  if (msgkeys != NULL)
1351  free(msgkeys);
1352  }
1353  Log(TRACE_MINIMUM, -1, "%d queued messages deleted for client %s", messages_deleted, c->clientID);
1354  FUNC_EXIT_RC(rc);
1355  return rc;
1356 }
1357 
1358 
1360 {
1361  int rc = 0;
1362  char **msgkeys;
1363  int nkeys;
1364  int i = 0;
1365  int messages_deleted = 0;
1366 
1367  FUNC_ENTRY;
1368  if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
1369  {
1370  while (rc == 0 && i < nkeys)
1371  {
1372  if (strncmp(msgkeys[i], PERSISTENCE_PUBLISH_SENT, strlen(PERSISTENCE_PUBLISH_SENT)) == 0 ||
1373  strncmp(msgkeys[i], PERSISTENCE_V5_PUBLISH_SENT, strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0 ||
1374  strncmp(msgkeys[i], PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0 ||
1375  strncmp(msgkeys[i], PERSISTENCE_V5_PUBREL, strlen(PERSISTENCE_V5_PUBREL)) == 0 ||
1376  strncmp(msgkeys[i], PERSISTENCE_PUBLISH_RECEIVED, strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0 ||
1377  strncmp(msgkeys[i], PERSISTENCE_V5_PUBLISH_RECEIVED, strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
1378  {
1379  if ((rc = c->persistence->premove(c->phandle, msgkeys[i])) == 0)
1380  messages_deleted++;
1381  else
1382  Log(LOG_ERROR, 0, "Error %d removing inflight message from persistence", rc);
1383  }
1384  if (msgkeys[i])
1385  free(msgkeys[i]);
1386  i++;
1387  }
1388  if (msgkeys != NULL)
1389  free(msgkeys);
1390  }
1391  Log(TRACE_MINIMUM, -1, "%d inflight messages deleted for client %s", messages_deleted, c->clientID);
1392  FUNC_EXIT_RC(rc);
1393  return rc;
1394 }
1395 #endif
1396 
1403 static int clientCompareConnectCommand(void* a, void* b)
1404 {
1407  if (cmd1->client == cmd2->client)
1408  {
1409  if (cmd1->command.type == cmd2->command.type)
1410  {
1411  if (cmd1->command.type == CONNECT || cmd1->command.type == DISCONNECT)
1412  {
1413  return 1; //Item found in the list
1414  }
1415  }
1416  }
1417  return 0; //Item NOT found in the list
1418 }
1419 
1420 static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
1421 {
1422  int rc = MQTTASYNC_SUCCESS;
1423 
1424  FUNC_ENTRY;
1426  /* Don't set start time if the connect command is already in process #218 */
1427  if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
1429  if (command->command.type == CONNECT ||
1430  (command->command.type == DISCONNECT && command->command.details.dis.internal))
1431  {
1432  MQTTAsync_queuedCommand* head = NULL;
1433 
1434  if (commands->first)
1435  head = (MQTTAsync_queuedCommand*)(commands->first->content);
1436 
1437  if (head != NULL && head->client == command->client && head->command.type == command->command.type)
1438  MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
1439  else
1440  {
1441  ListRemoveItem(commands, command, clientCompareConnectCommand); /* remove command from the list if already there */
1442  ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
1443  }
1444  }
1445  else
1446  {
1447  ListAppend(commands, command, command_size);
1448 #if !defined(NO_PERSISTENCE)
1449  if (command->client->c->persistence)
1450  {
1451  if (command->command.type == PUBLISH &&
1452  command->client->createOptions && command->client->createOptions->struct_version >= 2 &&
1453  command->client->createOptions->persistQoS0 == 0 && command->command.details.pub.qos == 0)
1454  ; /* don't persist QoS0 if that create option is set to 0 */
1455  else
1456  {
1457  int rc = MQTTAsync_persistCommand(command);
1458  if (command->command.type == PUBLISH && rc == 0)
1459  {
1460  char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
1461 
1462  command->not_restored = 1;
1463  if (command->client->c->MQTTVersion >= MQTTVERSION_5)
1464  sprintf(key, "%s%u", PERSISTENCE_V5_COMMAND_KEY, command->seqno);
1465  else
1466  sprintf(key, "%s%u", PERSISTENCE_COMMAND_KEY, command->seqno);
1467  command->key = malloc(strlen(key+1));
1468  strcpy(command->key, key);
1469 
1470  free(command->command.details.pub.payload);
1471  command->command.details.pub.payload = NULL;
1472  free(command->command.details.pub.destinationName);
1473  command->command.details.pub.destinationName = NULL;
1475  }
1476  }
1477  }
1478 #endif
1479  if (command->command.type == PUBLISH)
1480  {
1481  /* delete oldest message if buffer is full. We wouldn't be here if delete newest was in operation */
1482  if (command->client->createOptions && (command->client->noBufferedMessages >= command->client->createOptions->maxBufferedMessages))
1483  {
1484  MQTTAsync_queuedCommand* first_publish = NULL;
1485  ListElement* current = NULL;
1486 
1487  /* Find first publish command for this client and detach it */
1488  while (ListNextElement(commands, &current))
1489  {
1491 
1492  if (cmd->client == command->client && cmd->command.type == PUBLISH)
1493  {
1494  first_publish = cmd;
1495  break;
1496  }
1497  }
1498  if (first_publish)
1499  {
1500  ListDetach(commands, first_publish);
1501 
1502  MQTTAsync_freeCommand(first_publish);
1503  #if !defined(NO_PERSISTENCE)
1504  if (command->client->c->persistence)
1505  MQTTAsync_unpersistCommand(first_publish);
1506  #endif
1507  }
1508  }
1509  else
1510  command->client->noBufferedMessages++;
1511  }
1512  }
1514 #if !defined(_WIN32) && !defined(_WIN64)
1515  rc = Thread_signal_cond(send_cond);
1516  if (rc != 0)
1517  Log(LOG_ERROR, 0, "Error %d from signal cond", rc);
1518 #else
1519  rc = Thread_post_sem(send_sem);
1520 #endif
1521  FUNC_EXIT_RC(rc);
1522  return rc;
1523 }
1524 
1525 
1527 {
1529  {
1531  if (m->retrying)
1532  {
1534  }
1535  else
1536  {
1538  m->retrying = 1;
1539  }
1541  }
1542 }
1543 
1544 
1546 {
1547  int rc = MQTTASYNC_FAILURE;
1548  MQTTAsyncs* m = handle;
1549 
1550  FUNC_ENTRY;
1552 
1553  if (m->automaticReconnect)
1554  {
1555  if (m->shouldBeConnected)
1556  {
1557  m->reconnectNow = 1;
1558  if (m->retrying == 0)
1559  {
1562  m->retrying = 1;
1563  }
1564  rc = MQTTASYNC_SUCCESS;
1565  }
1566  }
1567  else
1568  {
1569  /* to reconnect, put the connect command to the head of the command queue */
1571  if (!conn)
1572  {
1573  rc = PAHO_MEMORY_ERROR;
1574  goto exit;
1575  }
1576  memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
1577  conn->client = m;
1578  conn->command = m->connect;
1579  /* make sure that the version attempts are restarted */
1580  if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
1581  conn->command.details.conn.MQTTVersion = 0;
1582  rc = MQTTAsync_addCommand(conn, sizeof(m->connect));
1583  }
1584 
1585 exit:
1587  FUNC_EXIT_RC(rc);
1588  return rc;
1589 }
1590 
1591 
1593 {
1594  MQTTAsyncs* m = handle;
1595 
1596  FUNC_ENTRY;
1597  /* wait for all inflight message flows to finish, up to timeout */;
1598  if (m->c->outboundMsgs->count == 0 || MQTTTime_elapsed(command->start_time) >= (ELAPSED_TIME_TYPE)command->details.dis.timeout)
1599  {
1600  int was_connected = m->c->connected;
1601  MQTTAsync_closeSession(m->c, command->details.dis.reasonCode, &command->properties);
1602  if (command->details.dis.internal)
1603  {
1604  if (m->cl && was_connected)
1605  {
1606  Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
1607  (*(m->cl))(m->clContext, NULL);
1608  }
1610  }
1611  else if (command->onSuccess)
1612  {
1614 
1615  memset(&data, '\0', sizeof(data));
1616  Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
1617  (*(command->onSuccess))(command->context, &data);
1618  }
1619  else if (command->onSuccess5)
1620  {
1622 
1624  Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
1625  (*(command->onSuccess5))(command->context, &data);
1626  }
1627  }
1628  FUNC_EXIT;
1629 }
1630 
1635 {
1636  int rc;
1638  rc = Socket_noPendingWrites(socket);
1640  return rc;
1641 }
1642 
1649 {
1650  FUNC_ENTRY;
1651  if (state.pending_writes.count > 0)
1652  {
1653  ListElement* le = state.pending_writes.first;
1654  while (le)
1655  {
1656  if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
1657  {
1659  state.pending_writes.current = le;
1660  ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
1661  le = state.pending_writes.current;
1662  }
1663  else
1664  ListNextElement(&(state.pending_writes), &le);
1665  }
1666  }
1667  FUNC_EXIT;
1668 }
1669 
1670 
1672 {
1673  int i;
1674 
1675  for (i = 0; i < m->serverURIcount; ++i)
1676  free(m->serverURIs[i]);
1677  m->serverURIcount = 0;
1678  if (m->serverURIs)
1679  free(m->serverURIs);
1680  m->serverURIs = NULL;
1681 }
1682 
1683 
1685 {
1686  if (command->command.type == SUBSCRIBE)
1687  {
1688  int i;
1689 
1690  for (i = 0; i < command->command.details.sub.count; i++)
1691  free(command->command.details.sub.topics[i]);
1692 
1693  free(command->command.details.sub.topics);
1694  command->command.details.sub.topics = NULL;
1695  free(command->command.details.sub.qoss);
1696  command->command.details.sub.qoss = NULL;
1697  }
1698  else if (command->command.type == UNSUBSCRIBE)
1699  {
1700  int i;
1701 
1702  for (i = 0; i < command->command.details.unsub.count; i++)
1703  free(command->command.details.unsub.topics[i]);
1704 
1705  free(command->command.details.unsub.topics);
1706  command->command.details.unsub.topics = NULL;
1707  }
1708  else if (command->command.type == PUBLISH)
1709  {
1710  /* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
1711  if (command->command.details.pub.destinationName)
1712  free(command->command.details.pub.destinationName);
1713  command->command.details.pub.destinationName = NULL;
1714  if (command->command.details.pub.payload)
1715  free(command->command.details.pub.payload);
1716  command->command.details.pub.payload = NULL;
1717  }
1719  if (command->not_restored && command->key)
1720  free(command->key);
1721 }
1722 
1724 {
1725  MQTTAsync_freeCommand1(command);
1726  free(command);
1727 }
1728 
1729 
1730 static void MQTTAsync_writeComplete(int socket, int rc)
1731 {
1732  ListElement* found = NULL;
1733 
1734  FUNC_ENTRY;
1735  /* a partial write is now complete for a socket - this will be on a publish*/
1736 
1738 
1739  /* find the client using this socket */
1740  if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
1741  {
1742  MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
1743 
1744  m->c->net.lastSent = MQTTTime_now();
1745 
1746  /* see if there is a pending write flagged */
1747  if (m->pending_write)
1748  {
1749  ListElement* cur_response = NULL;
1751  MQTTAsync_queuedCommand* com = NULL;
1752 
1753  cur_response = NULL;
1754  while (ListNextElement(m->responses, &cur_response))
1755  {
1756  com = (MQTTAsync_queuedCommand*)(cur_response->content);
1757  if (&com->command == m->pending_write)
1758  break;
1759  }
1760 
1761  if (cur_response) /* we found a response */
1762  {
1763  if (command->type == PUBLISH)
1764  {
1765  if (rc == 1 && command->details.pub.qos == 0)
1766  {
1767  if (command->onSuccess)
1768  {
1770 
1771  data.token = command->token;
1772  data.alt.pub.destinationName = command->details.pub.destinationName;
1773  data.alt.pub.message.payload = command->details.pub.payload;
1774  data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
1775  data.alt.pub.message.qos = command->details.pub.qos;
1776  data.alt.pub.message.retained = command->details.pub.retained;
1777  Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
1778  (*(command->onSuccess))(command->context, &data);
1779  }
1780  else if (command->onSuccess5)
1781  {
1783 
1784  data.token = command->token;
1785  data.alt.pub.destinationName = command->details.pub.destinationName;
1786  data.alt.pub.message.payload = command->details.pub.payload;
1787  data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
1788  data.alt.pub.message.qos = command->details.pub.qos;
1789  data.alt.pub.message.retained = command->details.pub.retained;
1790  data.properties = command->properties;
1791  Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
1792  (*(command->onSuccess5))(command->context, &data);
1793  }
1794  }
1795  else if (rc == -1)
1796  {
1797  if (command->onFailure)
1798  {
1800 
1801  data.token = command->token;
1802  data.code = rc;
1803  data.message = NULL;
1804  Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
1805  (*(command->onFailure))(command->context, &data);
1806  }
1807  else if (command->onFailure5)
1808  {
1810 
1811  data.token = command->token;
1812  data.code = rc;
1813  data.message = NULL;
1814  data.packet_type = PUBLISH;
1815  Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
1816  (*(command->onFailure5))(command->context, &data);
1817  }
1818  }
1819  else
1820  com = NULL; /* Don't delete response we haven't acknowledged */
1821  /* QoS 0 payloads are freed elsewhere after a write complete,
1822  * so we should indicate that.
1823  */
1824  if (command->details.pub.qos == 0)
1825  command->details.pub.payload = NULL;
1826  }
1827  if (com)
1828  {
1829  Log(TRACE_PROTOCOL, -1, "writeComplete: Removing response for msgid %d", com->command.token);
1830  ListDetach(m->responses, com);
1831  MQTTAsync_freeCommand(com);
1832  }
1833  } /* if cur_response */
1834  m->pending_write = NULL;
1835  } /* if pending_write */
1836  }
1837  FUNC_EXIT;
1838 }
1839 
1840 
1842 {
1843  int rc = 0;
1845  ListElement* cur_command = NULL;
1846  List* ignored_clients = NULL;
1847 
1848  FUNC_ENTRY;
1851 
1852  /* only the first command in the list must be processed for any particular client, so if we skip
1853  a command for a client, we must skip all following commands for that client. Use a list of
1854  ignored clients to keep track
1855  */
1856  ignored_clients = ListInitialize();
1857 
1858  /* don't try a command until there isn't a pending write for that client, and we are not connecting */
1859  while (ListNextElement(commands, &cur_command))
1860  {
1862 
1863  if (ListFind(ignored_clients, cmd->client))
1864  continue;
1865 
1866  if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
1868  {
1869  if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
1870  cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
1871  {
1872  ; /* no more message ids available */
1873  }
1874  else if (((cmd->command.type == PUBLISH && cmd->command.details.pub.qos > 0) ||
1875  cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
1876  (cmd->client->c->outboundMsgs->count >= cmd->client->c->maxInflightMessages))
1877  {
1878  Log(TRACE_MIN, -1, "Blocking on server receive maximum for client %s",
1879  cmd->client->c->clientID); /* flow control */
1880  }
1881  else
1882  {
1883  command = cmd;
1884  break;
1885  }
1886  }
1887  ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
1888  }
1889  ListFreeNoContent(ignored_clients);
1890  if (command)
1891  {
1892  if (command->command.type == PUBLISH)
1893  command->client->noBufferedMessages--;
1894  ListDetach(commands, command);
1895 #if !defined(NO_PERSISTENCE)
1896  /*printf("outboundmsgs count %d max inflight %d qos %d %d %d\n", command->client->c->outboundMsgs->count, command->client->c->maxInflightMessages,
1897  command->command.details.pub.qos, command->client->c->MQTTVersion, command->command.type);*/
1898  if (command->client->c->persistence)
1899  {
1900  if (command->not_restored)
1901  {
1902  char* buffer = NULL;
1903  int buflen = 0;
1904 
1905  if ((rc = command->client->c->persistence->pget(command->client->c->phandle, command->key, &buffer, &buflen)) == 0
1906  && (command->client->c->afterRead == NULL ||
1907  (rc = command->client->c->afterRead(command->client->c->afterRead_context, &buffer, &buflen)) == 0))
1908  {
1909  int MQTTVersion = (strncmp(command->key, PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
1911  free(command->key);
1912  command->key = NULL;
1913  command = MQTTAsync_restoreCommand(buffer, buflen, MQTTVersion, command);
1914  if (buffer)
1915  free(buffer);
1916  }
1917  else
1918  Log(LOG_ERROR, -1, "Error restoring command: rc %d from pget\n", rc);
1919  }
1920  MQTTAsync_unpersistCommand(command);
1921  }
1922 #endif
1923  }
1925 
1926  if (!command)
1927  goto exit; /* nothing to do */
1928 
1929  if (command->command.type == CONNECT)
1930  {
1931  if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected)
1932  rc = 0;
1933  else
1934  {
1935  char* serverURI = command->client->serverURI;
1936 
1937  if (command->client->serverURIcount > 0)
1938  {
1939  if (command->command.details.conn.currentURI < command->client->serverURIcount)
1940  {
1941  serverURI = command->client->serverURIs[command->command.details.conn.currentURI];
1942 
1943  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
1944  serverURI += strlen(URI_TCP);
1945  else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
1946  {
1947  serverURI += strlen(URI_WS);
1948  command->client->websocket = 1;
1949  }
1950 #if defined(OPENSSL)
1951  else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
1952  {
1953  serverURI += strlen(URI_SSL);
1954  command->client->ssl = 1;
1955  }
1956  else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
1957  {
1958  serverURI += strlen(URI_WSS);
1959  command->client->ssl = 1;
1960  command->client->websocket = 1;
1961  }
1962 #endif
1963  }
1964  }
1965 
1966  if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
1967  {
1968  if (command->command.details.conn.MQTTVersion == MQTTVERSION_DEFAULT)
1969  command->command.details.conn.MQTTVersion = MQTTVERSION_3_1_1;
1970  else if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1_1)
1971  command->command.details.conn.MQTTVersion = MQTTVERSION_3_1;
1972  }
1973  else
1974  command->command.details.conn.MQTTVersion = command->client->c->MQTTVersion;
1975 
1976  Log(TRACE_PROTOCOL, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
1977 #if defined(OPENSSL)
1978 #if defined(__GNUC__) && defined(__linux__)
1979  rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->client->websocket,
1980  command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps, 100);
1981 #else
1982  rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->client->websocket,
1983  command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps);
1984 #endif
1985 #else
1986 #if defined(__GNUC__) && defined(__linux__)
1987  rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->websocket,
1988  command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps, 100);
1989 #else
1990  rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->websocket,
1991  command->command.details.conn.MQTTVersion, command->client->connectProps, command->client->willProps);
1992 #endif
1993 #endif
1994 
1995  if (command->client->c->connect_state == NOT_IN_PROGRESS)
1996  rc = SOCKET_ERROR;
1997 
1998  /* if the TCP connect is pending, then we must call select to determine when the connect has completed,
1999  which is indicated by the socket being ready *either* for reading *or* writing. The next couple of lines
2000  make sure we check for writeability as well as readability, otherwise we wait around longer than we need to
2001  in Socket_getReadySocket() */
2002  if (rc == EINPROGRESS)
2004  }
2005  }
2006  else if (command->command.type == SUBSCRIBE)
2007  {
2008  List* topics = ListInitialize();
2009  List* qoss = ListInitialize();
2010  MQTTProperties* props = NULL;
2011  MQTTSubscribe_options* subopts = NULL;
2012  int i;
2013 
2014  for (i = 0; i < command->command.details.sub.count; i++)
2015  {
2016  ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
2017  ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int));
2018  }
2019  if (command->client->c->MQTTVersion >= MQTTVERSION_5)
2020  {
2021  props = &command->command.properties;
2022  if (command->command.details.sub.count > 1)
2023  subopts = command->command.details.sub.optlist;
2024  else
2025  subopts = &command->command.details.sub.opts;
2026  }
2027  rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token, subopts, props);
2028  ListFreeNoContent(topics);
2029  ListFreeNoContent(qoss);
2030  if (command->client->c->MQTTVersion >= MQTTVERSION_5 && command->command.details.sub.count > 1)
2031  free(command->command.details.sub.optlist);
2032  }
2033  else if (command->command.type == UNSUBSCRIBE)
2034  {
2035  List* topics = ListInitialize();
2036  MQTTProperties* props = NULL;
2037  int i;
2038 
2039  for (i = 0; i < command->command.details.unsub.count; i++)
2040  ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
2041 
2042  if (command->client->c->MQTTVersion >= MQTTVERSION_5)
2043  props = &command->command.properties;
2044 
2045  rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token, props);
2046  ListFreeNoContent(topics);
2047  }
2048  else if (command->command.type == PUBLISH)
2049  {
2050  Messages* msg = NULL;
2051  Publish* p = NULL;
2053 
2054  if ((p = malloc(sizeof(Publish))) == NULL)
2055  {
2056  rc = PAHO_MEMORY_ERROR;
2057  goto exit;
2058  }
2059 
2060  p->payload = command->command.details.pub.payload;
2061  p->payloadlen = command->command.details.pub.payloadlen;
2062  p->topic = command->command.details.pub.destinationName;
2063  p->msgId = command->command.token;
2064  p->MQTTVersion = command->client->c->MQTTVersion;
2065  p->properties = initialized;
2066  if (p->MQTTVersion >= MQTTVERSION_5)
2067  p->properties = command->command.properties;
2068 
2069  rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
2070 
2071  if (command->command.details.pub.qos == 0)
2072  {
2073  if (rc == TCPSOCKET_COMPLETE)
2074  {
2075  if (command->command.onSuccess)
2076  {
2078 
2079  data.token = command->command.token;
2080  data.alt.pub.destinationName = command->command.details.pub.destinationName;
2081  data.alt.pub.message.payload = command->command.details.pub.payload;
2082  data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
2083  data.alt.pub.message.qos = command->command.details.pub.qos;
2084  data.alt.pub.message.retained = command->command.details.pub.retained;
2085  Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
2086  (*(command->command.onSuccess))(command->command.context, &data);
2087  }
2088  else if (command->command.onSuccess5)
2089  {
2091 
2092  data.token = command->command.token;
2093  data.alt.pub.destinationName = command->command.details.pub.destinationName;
2094  data.alt.pub.message.payload = command->command.details.pub.payload;
2095  data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
2096  data.alt.pub.message.qos = command->command.details.pub.qos;
2097  data.alt.pub.message.retained = command->command.details.pub.retained;
2098  data.properties = command->command.properties;
2099  Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
2100  (*(command->command.onSuccess5))(command->command.context, &data);
2101  }
2102  }
2103  else
2104  {
2105  if (rc != SOCKET_ERROR)
2106  command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
2107  command->client->pending_write = &command->command;
2108  }
2109  }
2110  else
2111  {
2112  command->command.details.pub.payload = NULL; /* this will be freed by the protocol code */
2113  command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
2114  }
2115  free(p); /* should this be done if the write isn't complete? */
2116  }
2117  else if (command->command.type == DISCONNECT)
2118  {
2119  if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected != 0)
2120  {
2121  if (command->client->c->connect_state != NOT_IN_PROGRESS)
2122  {
2123  command->client->c->connect_state = DISCONNECTING;
2124  if (command->client->connect.onFailure)
2125  {
2127 
2128  data.token = 0;
2130  data.message = NULL;
2131  Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
2132  (*(command->client->connect.onFailure))(command->client->connect.context, &data);
2133  }
2134  else if (command->client->connect.onFailure5)
2135  {
2137 
2138  data.token = 0;
2140  data.message = NULL;
2141  Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
2142  (*(command->client->connect.onFailure5))(command->client->connect.context, &data);
2143  }
2144  }
2145  MQTTAsync_checkDisconnect(command->client, &command->command);
2146  }
2147  }
2148 
2149  if (command->command.type == CONNECT && rc != SOCKET_ERROR && rc != MQTTASYNC_PERSISTENCE_ERROR)
2150  {
2151  command->client->connect = command->command;
2152  MQTTAsync_freeCommand(command);
2153  }
2154  else if (command->command.type == DISCONNECT)
2155  {
2156  command->client->disconnect = command->command;
2157  MQTTAsync_freeCommand(command);
2158  }
2159  else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0 &&
2161  {
2162  if (rc == TCPSOCKET_INTERRUPTED)
2163  ListAppend(command->client->responses, command, sizeof(command));
2164  else
2165  MQTTAsync_freeCommand(command);
2166  }
2167  else if (rc == SOCKET_ERROR || rc == MQTTASYNC_PERSISTENCE_ERROR)
2168  {
2169  if (command->command.type == CONNECT)
2170  {
2172  MQTTAsync_disconnect(command->client, &opts); /* not "internal" because we don't want to call connection lost */
2173  command->client->shouldBeConnected = 1; /* as above call is not "internal" we need to reset this */
2174  }
2175  else
2177 
2178  if (command->command.type == CONNECT
2179  && MQTTAsync_checkConn(&command->command, command->client))
2180  {
2181  Log(TRACE_MIN, -1, "Connect failed, more to try");
2182 
2183  if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
2184  {
2185  if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
2186  {
2187  command->command.details.conn.currentURI++;
2188  command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
2189  }
2190  } else
2191  command->command.details.conn.currentURI++; /* Here currentURI becomes larger than command->client->serverURIcount. This needs to be handled to avoid segmentation faults! */
2192 
2193  /* put the connect command back to the head of the command queue, using the next serverURI */
2194  rc = MQTTAsync_addCommand(command,
2195  sizeof(command->command.details.conn));
2196  } else
2197  {
2198  if (command->command.onFailure)
2199  {
2201 
2202  data.token = 0;
2203  data.code = rc;
2204  data.message = NULL;
2205  Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
2206  (*(command->command.onFailure))(command->command.context, &data);
2207  }
2208  else if (command->command.onFailure5)
2209  {
2211 
2212  data.code = rc;
2213  Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
2214  (*(command->command.onFailure5))(command->command.context, &data);
2215  }
2216  if (command->command.type == CONNECT)
2217  {
2218  command->client->connect = command->command;
2220  }
2221  MQTTAsync_freeCommand(command); /* free up the command if necessary */
2222  }
2223  }
2224  else /* put the command into a waiting for response queue for each client, indexed by msgid */
2225  ListAppend(command->client->responses, command, sizeof(command));
2226 
2227 exit:
2229  rc = (command != NULL);
2230  FUNC_EXIT_RC(rc);
2231  return rc;
2232 }
2233 
2234 
2235 static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
2236 {
2237  FUNC_ENTRY;
2238  if (MQTTAsync_checkConn(&m->connect, m))
2239  {
2241 
2243  /* put the connect command back to the head of the command queue, using the next serverURI */
2244  if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
2245  goto exit;
2246  memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
2247  conn->client = m;
2248  conn->command = m->connect;
2249  Log(TRACE_MIN, -1, "Connect failed, more to try");
2250 
2251  if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
2252  {
2253  if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
2254  {
2255  conn->command.details.conn.currentURI++;
2256  conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
2257  }
2258  }
2259  else
2260  conn->command.details.conn.currentURI++;
2261 
2262  MQTTAsync_addCommand(conn, sizeof(m->connect));
2263  }
2264  else
2265  {
2267  if (m->connect.onFailure)
2268  {
2270 
2271  data.token = 0;
2272  data.code = rc;
2273  data.message = message;
2274  Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
2275  (*(m->connect.onFailure))(m->connect.context, &data);
2276  }
2277  else if (m->connect.onFailure5)
2278  {
2280 
2281  data.token = 0;
2282  data.code = rc;
2283  data.message = message;
2284  Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
2285  (*(m->connect.onFailure5))(m->connect.context, &data);
2286  }
2288  }
2289 exit:
2290  FUNC_EXIT;
2291 }
2292 
2293 
2294 static void MQTTAsync_checkTimeouts(void)
2295 {
2296  ListElement* current = NULL;
2297  static START_TIME_TYPE last = START_TIME_ZERO;
2298  START_TIME_TYPE now;
2299 
2300  FUNC_ENTRY;
2302  now = MQTTTime_now();
2303  if (MQTTTime_difftime(now, last) < (DIFF_TIME_TYPE)3000)
2304  goto exit;
2305  last = now;
2306  while (ListNextElement(handles, &current)) /* for each client */
2307  {
2308  MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
2309 
2310  /* check disconnect timeout */
2311  if (m->c->connect_state == DISCONNECTING)
2313 
2314  /* check connect timeout */
2316  {
2317  nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect timeout");
2318  continue;
2319  }
2320 
2321  /* There was a section here that removed timed-out responses. But if the command had completed and
2322  * there was a response, then we may as well report it, no?
2323  *
2324  * In any case, that section was disabled when automatic reconnect was implemented.
2325  */
2326 
2327  if (m->automaticReconnect && m->retrying)
2328  {
2330  {
2331  /* to reconnect put the connect command to the head of the command queue */
2333  if (!conn)
2334  goto exit;
2335  memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
2336  conn->client = m;
2337  conn->command = m->connect;
2338  /* make sure that the version attempts are restarted */
2339  if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
2340  conn->command.details.conn.MQTTVersion = 0;
2341  if (m->updateConnectOptions)
2342  {
2344  int callback_rc = MQTTASYNC_SUCCESS;
2345 
2346  connectData.username = m->c->username;
2347  connectData.binarypwd.data = m->c->password;
2348  connectData.binarypwd.len = m->c->passwordlen;
2349  Log(TRACE_MIN, -1, "Calling updateConnectOptions for client %s", m->c->clientID);
2350  callback_rc = (*(m->updateConnectOptions))(m->updateConnectOptions_context, &connectData);
2351 
2352  if (callback_rc == 1)
2353  {
2354  if (connectData.username != m->c->username)
2355  {
2356  if (m->c->username)
2357  free((void*)m->c->username);
2358  if (connectData.username)
2359  m->c->username = MQTTStrdup(connectData.username);
2360  else
2361  m->c->username = NULL;
2362  }
2363  if (connectData.binarypwd.data != m->c->password)
2364  {
2365  if (m->c->password)
2366  free((void*)m->c->password);
2367  if (connectData.binarypwd.data)
2368  {
2369  m->c->passwordlen = connectData.binarypwd.len;
2370  if ((m->c->password = malloc(m->c->passwordlen)))
2371  memcpy((void*)m->c->password, connectData.binarypwd.data, m->c->passwordlen);
2372  }
2373  else
2374  {
2375  m->c->password = NULL;
2376  m->c->passwordlen = 0;
2377  }
2378  }
2379  }
2380  }
2381  Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
2382  MQTTAsync_addCommand(conn, sizeof(m->connect));
2383  m->reconnectNow = 0;
2384  }
2385  }
2386  }
2387 exit:
2389  FUNC_EXIT;
2390 }
2391 
2392 
2394 {
2395  FUNC_ENTRY;
2400  while (!tostop)
2401  {
2402  int rc;
2403 
2404  while (commands->count > 0)
2405  {
2406  if (MQTTAsync_processCommand() == 0)
2407  break; /* no commands were processed, so go into a wait */
2408  }
2409 #if !defined(_WIN32) && !defined(_WIN64)
2410  if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
2411  Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
2412 #else
2413  if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT)
2414  Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc);
2415 #endif
2416 
2418  }
2422  sendThread_id = 0;
2424  FUNC_EXIT;
2425 #if defined(_WIN32) || defined(_WIN64)
2426  ExitThread(0);
2427 #endif
2428  return 0;
2429 }
2430 
2431 
2433 {
2434  FUNC_ENTRY;
2435  /* empty message queue */
2436  if (client->messageQueue->count > 0)
2437  {
2438  ListElement* current = NULL;
2439  while (ListNextElement(client->messageQueue, &current))
2440  {
2441  qEntry* qe = (qEntry*)(current->content);
2442  free(qe->topicName);
2443  free(qe->msg->payload);
2444  free(qe->msg);
2445  }
2446  ListEmpty(client->messageQueue);
2447  }
2448  FUNC_EXIT;
2449 }
2450 
2451 
2453 {
2454  int count = 0;
2455 
2456  FUNC_ENTRY;
2457  if (m->responses)
2458  {
2459  ListElement* cur_response = NULL;
2460 
2461  while (ListNextElement(m->responses, &cur_response))
2462  {
2464 
2465  if (command->command.onFailure)
2466  {
2468 
2469  data.token = command->command.token;
2470  data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
2471  data.message = NULL;
2472 
2473  Log(TRACE_MIN, -1, "Calling %s failure for client %s",
2474  MQTTPacket_name(command->command.type), m->c->clientID);
2475  (*(command->command.onFailure))(command->command.context, &data);
2476  }
2477  else if (command->command.onFailure5)
2478  {
2480 
2481  data.token = command->command.token;
2482  data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
2483  data.message = NULL;
2484 
2485  Log(TRACE_MIN, -1, "Calling %s failure for client %s",
2486  MQTTPacket_name(command->command.type), m->c->clientID);
2487  (*(command->command.onFailure5))(command->command.context, &data);
2488  }
2489 
2490  MQTTAsync_freeCommand1(command);
2491  count++;
2492  }
2493  ListEmpty(m->responses);
2494  }
2495  Log(TRACE_MINIMUM, -1, "%d responses removed for client %s", count, m->c->clientID);
2496  FUNC_EXIT;
2497 }
2498 
2499 
2501 {
2502  int count = 0;
2503  ListElement* current = NULL;
2504  ListElement *next = NULL;
2505 
2506  FUNC_ENTRY;
2507  /* remove commands in the command queue relating to this client */
2508  current = ListNextElement(commands, &next);
2509  ListNextElement(commands, &next);
2510  while (current)
2511  {
2513 
2514  if (command->client == m)
2515  {
2516  ListDetach(commands, command);
2517 
2518  if (command->command.onFailure)
2519  {
2521 
2522  data.token = command->command.token;
2523  data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
2524  data.message = NULL;
2525 
2526  Log(TRACE_MIN, -1, "Calling %s failure for client %s",
2527  MQTTPacket_name(command->command.type), m->c->clientID);
2528  (*(command->command.onFailure))(command->command.context, &data);
2529  }
2530  else if (command->command.onFailure5)
2531  {
2533 
2534  data.token = command->command.token;
2535  data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
2536  data.message = NULL;
2537 
2538  Log(TRACE_MIN, -1, "Calling %s failure for client %s",
2539  MQTTPacket_name(command->command.type), m->c->clientID);
2540  (*(command->command.onFailure5))(command->command.context, &data);
2541  }
2542 
2543  MQTTAsync_freeCommand(command);
2544  count++;
2545  }
2546  current = next;
2547  ListNextElement(commands, &next);
2548  }
2549  Log(TRACE_MINIMUM, -1, "%d commands removed for client %s", count, m->c->clientID);
2550  FUNC_EXIT;
2551 }
2552 
2553 
2555 {
2556  MQTTAsyncs* m = *handle;
2557 
2558  FUNC_ENTRY;
2560 
2561  if (m == NULL)
2562  goto exit;
2563 
2565 
2568  ListFree(m->responses);
2569 
2570  if (m->c)
2571  {
2572  int saved_socket = m->c->net.socket;
2573  char* saved_clientid = MQTTStrdup(m->c->clientID);
2574 #if !defined(NO_PERSISTENCE)
2576 #endif
2579  if (!ListRemove(bstate->clients, m->c))
2580  Log(LOG_ERROR, 0, NULL);
2581  else
2582  Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
2583  free(saved_clientid);
2584  }
2585 
2586  if (m->serverURI)
2587  free(m->serverURI);
2588  if (m->createOptions)
2589  free(m->createOptions);
2591  if (m->connectProps)
2592  {
2594  free(m->connectProps);
2595  m->connectProps = NULL;
2596  }
2597  if (m->willProps)
2598  {
2600  free(m->willProps);
2601  m->willProps = NULL;
2602  }
2603  if (!ListRemove(handles, m))
2604  Log(LOG_ERROR, -1, "free error");
2605  *handle = NULL;
2606  if (bstate->clients->count == 0)
2608 
2609 exit:
2611  FUNC_EXIT;
2612 }
2613 
2614 
2616 {
2617  FUNC_ENTRY;
2618  MQTTProperties_free(&(*message)->properties);
2619  free((*message)->payload);
2620  free(*message);
2621  *message = NULL;
2622  FUNC_EXIT;
2623 }
2624 
2625 
2626 void MQTTAsync_free(void* memory)
2627 {
2628  FUNC_ENTRY;
2629  free(memory);
2630  FUNC_EXIT;
2631 }
2632 
2633 
2634 void* MQTTAsync_malloc(size_t size)
2635 {
2636  void* val;
2637  int rc = 0;
2638 
2639  FUNC_ENTRY;
2640  val = malloc(size);
2641  rc = (val != NULL);
2642  FUNC_EXIT_RC(rc);
2643  return val;
2644 }
2645 
2646 
2648 {
2649  int rc = MQTTASYNC_FAILURE;
2650 
2651  FUNC_ENTRY;
2652  if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
2653  {
2654  Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
2655  if ((rc = connack->rc) == MQTTASYNC_SUCCESS)
2656  {
2657  m->retrying = 0;
2658  m->c->connected = 1;
2659  m->c->good = 1;
2661  if (m->c->cleansession || m->c->cleanstart)
2662  rc = MQTTAsync_cleanSession(m->c);
2663  else if (m->c->MQTTVersion >= MQTTVERSION_3_1_1 && connack->flags.bits.sessionPresent == 0)
2664  {
2665  Log(LOG_PROTOCOL, -1, "Cleaning session state on connect because sessionPresent is 0");
2666  rc = MQTTAsync_cleanSession(m->c);
2667  }
2668  if (m->c->outboundMsgs->count > 0)
2669  {
2670  ListElement* outcurrent = NULL;
2672 
2673  while (ListNextElement(m->c->outboundMsgs, &outcurrent))
2674  {
2675  Messages* messages = (Messages*)(outcurrent->content);
2676  memset(&messages->lastTouch, '\0', sizeof(messages->lastTouch));
2677  }
2678  MQTTProtocol_retry(zero, 1, 1);
2679  if (m->c->connected != 1)
2681  }
2682  }
2683  m->pack = NULL;
2684 #if !defined(_WIN32) && !defined(_WIN64)
2685  Thread_signal_cond(send_cond);
2686 #else
2687  Thread_post_sem(send_sem);
2688 #endif
2689  }
2690  FUNC_EXIT_RC(rc);
2691  return rc;
2692 }
2693 
2694 
2695 /* This is the thread function that handles the calling of callback functions if set */
2697 {
2698  long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
2699 
2700  FUNC_ENTRY;
2704  while (!tostop)
2705  {
2706  int rc = SOCKET_ERROR;
2707  int sock = -1;
2708  MQTTAsyncs* m = NULL;
2709  MQTTPacket* pack = NULL;
2710 
2712  pack = MQTTAsync_cycle(&sock, timeout, &rc);
2714  if (tostop)
2715  break;
2716  timeout = 1000L;
2717 
2718  if (sock == 0)
2719  continue;
2720  /* find client corresponding to socket */
2721  if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
2722  {
2723  Log(TRACE_MINIMUM, -1, "Could not find client corresponding to socket %d", sock);
2724  /* Socket_close(sock); - removing socket in this case is not necessary (Bug 442400) */
2725  continue;
2726  }
2727  m = (MQTTAsyncs*)(handles->current->content);
2728  if (m == NULL)
2729  {
2730  Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
2731  Socket_close(sock);
2732  continue;
2733  }
2734  if (rc == SOCKET_ERROR)
2735  {
2736  Log(TRACE_MINIMUM, -1, "Error from MQTTAsync_cycle() - removing socket %d", sock);
2737  if (m->c->connected == 1)
2739  nextOrClose(m, rc, "socket error");
2740  }
2741  else
2742  {
2743  if (m->c->messageQueue->count > 0 && m->ma)
2744  {
2745  qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
2746  int topicLen = qe->topicLen;
2747 
2748  if (strlen(qe->topicName) == topicLen)
2749  topicLen = 0;
2750 
2751  if (MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg))
2752  {
2753 #if !defined(NO_PERSISTENCE)
2754  if (m->c->persistence)
2756 #endif
2757  ListRemove(m->c->messageQueue, qe); /* qe is freed here */
2758  }
2759  else
2760  Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
2761  m->c->clientID);
2762  }
2763  if (pack)
2764  {
2765  if (pack->header.bits.type == CONNACK)
2766  {
2767  Connack* connack = (Connack*)pack;
2768  int sessionPresent = connack->flags.bits.sessionPresent;
2769 
2770  rc = MQTTAsync_completeConnection(m, connack);
2771  if (rc == MQTTASYNC_SUCCESS)
2772  {
2773  int onSuccess = 0;
2774  if ((m->serverURIcount > 0)
2775  && (m->connect.details.conn.currentURI < m->serverURIcount))
2776  {
2777  Log(TRACE_MIN, -1, "Connect succeeded to %s",
2778  m->serverURIs[m->connect.details.conn.currentURI]);
2779  }
2780  onSuccess = (m->connect.onSuccess != NULL ||
2781  m->connect.onSuccess5 != NULL); /* save setting of onSuccess callback */
2782  if (m->connect.onSuccess)
2783  {
2785  memset(&data, '\0', sizeof(data));
2786  Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
2787  if ((m->serverURIcount > 0)
2788  && (m->connect.details.conn.currentURI < m->serverURIcount))
2789  data.alt.connect.serverURI = m->serverURIs[m->connect.details.conn.currentURI];
2790  else
2791  data.alt.connect.serverURI = m->serverURI;
2792  data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
2793  data.alt.connect.sessionPresent = sessionPresent;
2794  (*(m->connect.onSuccess))(m->connect.context, &data);
2795  m->connect.onSuccess = NULL; /* don't accidentally call it again */
2796  }
2797  else if (m->connect.onSuccess5)
2798  {
2800  Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
2801  if (m->serverURIcount > 0)
2802  data.alt.connect.serverURI = m->serverURIs[m->connect.details.conn.currentURI];
2803  else
2804  data.alt.connect.serverURI = m->serverURI;
2805  data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
2806  data.alt.connect.sessionPresent = sessionPresent;
2807  data.properties = connack->properties;
2808  data.reasonCode = connack->rc;
2809  (*(m->connect.onSuccess5))(m->connect.context, &data);
2810  m->connect.onSuccess5 = NULL; /* don't accidentally call it again */
2811  }
2812  if (m->connected)
2813  {
2814  char* reason = (onSuccess) ? "connect onSuccess called" : "automatic reconnect";
2815  Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID);
2816  (*(m->connected))(m->connected_context, reason);
2817  }
2818  if (m->c->MQTTVersion >= MQTTVERSION_5)
2819  {
2821  {
2823  if (m->c->maxInflightMessages > recv_max)
2824  m->c->maxInflightMessages = recv_max;
2825  }
2826  }
2827  }
2828  else
2829  {
2830  nextOrClose(m, rc, "CONNACK return code");
2831  }
2832  MQTTPacket_freeConnack(connack);
2833  }
2834  else if (pack->header.bits.type == SUBACK)
2835  {
2836  ListElement* current = NULL;
2837 
2838  /* use the msgid to find the callback to be called */
2839  while (ListNextElement(m->responses, &current))
2840  {
2842  if (command->command.token == ((Suback*)pack)->msgId)
2843  {
2844  Suback* sub = (Suback*)pack;
2845  if (!ListDetach(m->responses, command)) /* remove the response from the list */
2846  Log(LOG_ERROR, -1, "Subscribe command not removed from command list");
2847 
2848  /* Call the failure callback if there is one subscribe in the MQTT packet and
2849  * the return code is 0x80 (failure). If the MQTT packet contains >1 subscription
2850  * request, then we call onSuccess with the list of returned QoSs, which inelegantly,
2851  * could include some failures, or worse, the whole list could have failed.
2852  */
2853  if (m->c->MQTTVersion >= MQTTVERSION_5)
2854  {
2855  if (sub->qoss->count == 1 && *(int*)(sub->qoss->first->content) >= MQTTREASONCODE_UNSPECIFIED_ERROR)
2856  {
2857  if (command->command.onFailure5)
2858  {
2860 
2861  data.token = command->command.token;
2862  data.reasonCode = *(int*)(sub->qoss->first->content);
2863  data.message = NULL;
2864  data.properties = sub->properties;
2865  Log(TRACE_MIN, -1, "Calling subscribe failure for client %s", m->c->clientID);
2866  (*(command->command.onFailure5))(command->command.context, &data);
2867  }
2868  }
2869  else if (command->command.onSuccess5)
2870  {
2872  enum MQTTReasonCodes* array = NULL;
2873 
2874  data.reasonCode = *(int*)(sub->qoss->first->content);
2875  data.alt.sub.reasonCodeCount = sub->qoss->count;
2876  if (sub->qoss->count > 1)
2877  {
2878  ListElement* cur_qos = NULL;
2879  enum MQTTReasonCodes* element = array = data.alt.sub.reasonCodes = malloc(sub->qoss->count * sizeof(enum MQTTReasonCodes));
2880  if (array)
2881  while (ListNextElement(sub->qoss, &cur_qos))
2882  *element++ = *(int*)(cur_qos->content);
2883  }
2884  data.token = command->command.token;
2885  data.properties = sub->properties;
2886  Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
2887  (*(command->command.onSuccess5))(command->command.context, &data);
2888  if (array)
2889  free(array);
2890  }
2891  }
2892  else if (sub->qoss->count == 1 && *(int*)(sub->qoss->first->content) == MQTT_BAD_SUBSCRIBE)
2893  {
2894  if (command->command.onFailure)
2895  {
2897 
2898  data.token = command->command.token;
2899  data.code = *(int*)(sub->qoss->first->content);
2900  data.message = NULL;
2901  Log(TRACE_MIN, -1, "Calling subscribe failure for client %s", m->c->clientID);
2902  (*(command->command.onFailure))(command->command.context, &data);
2903  }
2904  }
2905  else if (command->command.onSuccess)
2906  {
2908  int* array = NULL;
2909 
2910  if (sub->qoss->count == 1)
2911  data.alt.qos = *(int*)(sub->qoss->first->content);
2912  else if (sub->qoss->count > 1)
2913  {
2914  ListElement* cur_qos = NULL;
2915  int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
2916  if (array)
2917  while (ListNextElement(sub->qoss, &cur_qos))
2918  *element++ = *(int*)(cur_qos->content);
2919  }
2920  data.token = command->command.token;
2921  Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
2922  (*(command->command.onSuccess))(command->command.context, &data);
2923  if (array)
2924  free(array);
2925  }
2926  MQTTAsync_freeCommand(command);
2927  break;
2928  }
2929  }
2930  rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
2931  }
2932  else if (pack->header.bits.type == UNSUBACK)
2933  {
2934  ListElement* current = NULL;
2935  Unsuback* unsub = (Unsuback*)pack;
2936 
2937  /* use the msgid to find the callback to be called */
2938  while (ListNextElement(m->responses, &current))
2939  {
2941  if (command->command.token == ((Unsuback*)pack)->msgId)
2942  {
2943  if (!ListDetach(m->responses, command)) /* remove the response from the list */
2944  Log(LOG_ERROR, -1, "Unsubscribe command not removed from command list");
2945  if (command->command.onSuccess || command->command.onSuccess5)
2946  {
2947  Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
2948  if (command->command.onSuccess)
2949  {
2951 
2952  memset(&data, '\0', sizeof(data));
2953  data.token = command->command.token;
2954  (*(command->command.onSuccess))(command->command.context, &data);
2955  }
2956  else
2957  {
2959  enum MQTTReasonCodes* array = NULL;
2960 
2961  data.reasonCode = *(enum MQTTReasonCodes*)(unsub->reasonCodes->first->content);
2962  data.alt.unsub.reasonCodeCount = unsub->reasonCodes->count;
2963  if (unsub->reasonCodes->count > 1)
2964  {
2965  ListElement* cur_rc = NULL;
2966  enum MQTTReasonCodes* element = array = data.alt.unsub.reasonCodes = malloc(unsub->reasonCodes->count * sizeof(enum MQTTReasonCodes));
2967  if (array)
2968  while (ListNextElement(unsub->reasonCodes, &cur_rc))
2969  *element++ = *(enum MQTTReasonCodes*)(cur_rc->content);
2970  }
2971  data.token = command->command.token;
2972  data.properties = unsub->properties;
2973  Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
2974  (*(command->command.onSuccess5))(command->command.context, &data);
2975  if (array)
2976  free(array);
2977  }
2978  }
2979  MQTTAsync_freeCommand(command);
2980  break;
2981  }
2982  }
2983  rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
2984  }
2985  else if (pack->header.bits.type == DISCONNECT)
2986  {
2987  Ack* disc = (Ack*)pack;
2988 
2989  if (m->disconnected)
2990  {
2991  Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
2992  (*(m->disconnected))(m->disconnected_context, &disc->properties, disc->rc);
2993  }
2994  MQTTPacket_freeAck(disc);
2995  }
2996  }
2997  }
2998  }
3000  receiveThread_id = 0;
3002 #if !defined(_WIN32) && !defined(_WIN64)
3003  if (sendThread_state != STOPPED)
3004  Thread_signal_cond(send_cond);
3005 #else
3006  if (sendThread_state != STOPPED)
3007  Thread_post_sem(send_sem);
3008 #endif
3009  FUNC_EXIT;
3010 #if defined(_WIN32) || defined(_WIN64)
3011  ExitThread(0);
3012 #endif
3013  return 0;
3014 }
3015 
3016 
3017 static void MQTTAsync_stop(void)
3018 {
3019 #if !defined(NOSTACKTRACE)
3020  int rc = 0;
3021 #endif
3022 
3023  FUNC_ENTRY;
3025  {
3026  int conn_count = 0;
3027  ListElement* current = NULL;
3028 
3029  if (handles != NULL)
3030  {
3031  /* find out how many handles are still connected */
3032  while (ListNextElement(handles, &current))
3033  {
3034  if (((MQTTAsyncs*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
3035  ((MQTTAsyncs*)(current->content))->c->connected)
3036  ++conn_count;
3037  }
3038  }
3039  Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
3040  /* stop the background thread, if we are the last one to be using it */
3041  if (conn_count == 0)
3042  {
3043  int count = 0;
3044  tostop = 1;
3045  while ((sendThread_state != STOPPED || receiveThread_state != STOPPED) && ++count < 100)
3046  {
3048  Log(TRACE_MIN, -1, "sleeping");
3049  MQTTAsync_sleep(100L);
3051  }
3052 #if !defined(NOSTACKTRACE)
3053  rc = 1;
3054 #endif
3055  tostop = 0;
3056  }
3057  }
3058  FUNC_EXIT_RC(rc);
3059 }
3060 
3061 
3066 {
3067  int rc = MQTTASYNC_SUCCESS;
3068  MQTTAsyncs* m = handle;
3069 
3070  FUNC_ENTRY;
3072 
3073  if (m == NULL || ma == NULL || m->c == NULL || m->c->connect_state != NOT_IN_PROGRESS)
3074  rc = MQTTASYNC_FAILURE;
3075  else
3076  {
3077  m->clContext = m->maContext = m->dcContext = context;
3078  m->cl = cl;
3079  m->ma = ma;
3080  m->dc = dc;
3081  }
3082 
3084  FUNC_EXIT_RC(rc);
3085  return rc;
3086 }
3087 
3090 {
3091  int rc = MQTTASYNC_SUCCESS;
3092  MQTTAsyncs* m = handle;
3093 
3094  FUNC_ENTRY;
3096 
3097  if (m == NULL || m->c->connect_state != 0)
3098  rc = MQTTASYNC_FAILURE;
3099  else
3100  {
3101  m->clContext = context;
3102  m->cl = cl;
3103  }
3104 
3106  FUNC_EXIT_RC(rc);
3107  return rc;
3108 }
3109 
3110 
3113 {
3114  int rc = MQTTASYNC_SUCCESS;
3115  MQTTAsyncs* m = handle;
3116 
3117  FUNC_ENTRY;
3119 
3120  if (m == NULL || ma == NULL || m->c->connect_state != 0)
3121  rc = MQTTASYNC_FAILURE;
3122  else
3123  {
3124  m->maContext = context;
3125  m->ma = ma;
3126  }
3127 
3129  FUNC_EXIT_RC(rc);
3130  return rc;
3131 }
3132 
3135 {
3136  int rc = MQTTASYNC_SUCCESS;
3137  MQTTAsyncs* m = handle;
3138 
3139  FUNC_ENTRY;
3141 
3142  if (m == NULL || m->c->connect_state != 0)
3143  rc = MQTTASYNC_FAILURE;
3144  else
3145  {
3146  m->dcContext = context;
3147  m->dc = dc;
3148  }
3149 
3151  FUNC_EXIT_RC(rc);
3152  return rc;
3153 }
3154 
3155 
3157 {
3158  int rc = MQTTASYNC_SUCCESS;
3159  MQTTAsyncs* m = handle;
3160 
3161  FUNC_ENTRY;
3163 
3164  if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
3165  rc = MQTTASYNC_FAILURE;
3166  else
3167  {
3170  }
3171 
3173  FUNC_EXIT_RC(rc);
3174  return rc;
3175 }
3176 
3177 
3179 {
3180  int rc = MQTTASYNC_SUCCESS;
3181  MQTTAsyncs* m = handle;
3182 
3183  FUNC_ENTRY;
3185 
3186  if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
3187  rc = MQTTASYNC_FAILURE;
3188  else
3189  {
3191  m->connected = connected;
3192  }
3193 
3195  FUNC_EXIT_RC(rc);
3196  return rc;
3197 }
3198 
3199 
3201 {
3202  int rc = MQTTASYNC_SUCCESS;
3203  MQTTAsyncs* m = handle;
3204 
3205  FUNC_ENTRY;
3207 
3208  if (m == NULL)
3209  rc = MQTTASYNC_FAILURE;
3210  else
3211  {
3213  m->updateConnectOptions = updateOptions;
3214  }
3215 
3217  FUNC_EXIT_RC(rc);
3218  return rc;
3219 }
3220 
3221 
3223 {
3224  int rc = MQTTASYNC_SUCCESS;
3225  MQTTAsyncs* m = handle;
3226 
3227  FUNC_ENTRY;
3229 
3230  if (m == NULL)
3231  rc = MQTTASYNC_FAILURE;
3232  else
3233  {
3234  m->c->beforeWrite = co;
3236  }
3237 
3239  FUNC_EXIT_RC(rc);
3240  return rc;
3241 }
3242 
3243 
3245 {
3246  int rc = MQTTASYNC_SUCCESS;
3247  MQTTAsyncs* m = handle;
3248 
3249  FUNC_ENTRY;
3251 
3252  if (m == NULL)
3253  rc = MQTTASYNC_FAILURE;
3254  else
3255  {
3256  m->c->afterRead = co;
3257  m->c->afterRead_context = context;
3258  }
3259 
3261  FUNC_EXIT_RC(rc);
3262  return rc;
3263 }
3264 
3265 
3267 {
3268  FUNC_ENTRY;
3269  client->good = 0;
3270  client->ping_outstanding = 0;
3271  if (client->net.socket > 0)
3272  {
3274  if (client->connected && Socket_noPendingWrites(client->net.socket))
3275  MQTTPacket_send_disconnect(client, reasonCode, props);
3277  WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
3278 #if defined(OPENSSL)
3279  SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
3280  client->session = NULL; /* show the session has been freed */
3281  SSLSocket_close(&client->net);
3282 #endif
3283  Socket_close(client->net.socket);
3284  client->net.socket = 0;
3285 #if defined(OPENSSL)
3286  client->net.ssl = NULL;
3287 #endif
3289  }
3290  client->connected = 0;
3291  client->connect_state = NOT_IN_PROGRESS;
3292  FUNC_EXIT;
3293 }
3294 
3295 
3297 {
3298  FUNC_ENTRY;
3299  MQTTAsync_closeOnly(client, reasonCode, props);
3300 
3301  if (client->cleansession ||
3302  (client->MQTTVersion >= MQTTVERSION_5 && client->sessionExpiry == 0))
3303  MQTTAsync_cleanSession(client);
3304 
3305  FUNC_EXIT;
3306 }
3307 
3308 
3315 static int clientStructCompare(void* a, void* b)
3316 {
3317  MQTTAsyncs* m = (MQTTAsyncs*)a;
3318  return m->c == (Clients*)b;
3319 }
3320 
3321 
3329 {
3330  int rc = 0;
3331  ListElement* found = NULL;
3332 
3333  FUNC_ENTRY;
3334 #if !defined(NO_PERSISTENCE)
3336 #endif
3339  client->msgID = 0;
3340  if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL)
3341  {
3342  MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
3344  }
3345  else
3346  Log(LOG_ERROR, -1, "cleanSession: did not find client structure in handles list");
3347  FUNC_EXIT_RC(rc);
3348  return rc;
3349 }
3350 
3351 
3352 /*
3353 static int MQTTAsync_freeSession(Clients* client)
3354 {
3355  int rc = 0;
3356  ListElement* found = NULL;
3357 
3358  FUNC_ENTRY;
3359  MQTTAsync_emptyMessageQueue(client);
3360 
3361  if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL)
3362  {
3363  MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
3364  MQTTAsync_freeCommands(m);
3365  }
3366  else
3367  Log(LOG_ERROR, -1, "freeSession: did not find client structure in handles list");
3368  FUNC_EXIT_RC(rc);
3369  return rc;
3370 }*/
3371 
3372 
3373 /*
3374 * Deliver a message to the messageArrived callback
3375 * @param m a client structure
3376 * @param topicName the name of the topic on which the message is being delivered
3377 * @param topicLen the length of the topic name string
3378 * @param mm the message to be delivered
3379 * @return boolean 1 means message has been delivered, 0 that it has not
3380 */
3381 static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm)
3382 {
3383  int rc;
3384 
3385  Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
3386  m->c->clientID, m->c->messageQueue->count);
3387  rc = (*(m->ma))(m->maContext, topicName, (int)topicLen, mm);
3388  /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
3389  * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
3390  * so we must be careful how we use it.
3391  */
3392  return rc;
3393 }
3394 
3395 
3396 void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
3397 {
3398  MQTTAsync_message* mm = NULL;
3400  int rc = 0;
3401 
3402  FUNC_ENTRY;
3403  if ((mm = malloc(sizeof(MQTTAsync_message))) == NULL)
3404  goto exit;
3405  memcpy(mm, &initialized, sizeof(MQTTAsync_message));
3406 
3407  if (allocatePayload)
3408  {
3409  if ((mm->payload = malloc(publish->payloadlen)) == NULL)
3410  {
3411  free(mm);
3412  goto exit;
3413  }
3414  memcpy(mm->payload, publish->payload, publish->payloadlen);
3415  } else
3416  mm->payload = publish->payload;
3417  mm->payloadlen = publish->payloadlen;
3418  mm->qos = publish->header.bits.qos;
3419  mm->retained = publish->header.bits.retain;
3420  if (publish->header.bits.qos == 2)
3421  mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
3422  else
3423  mm->dup = publish->header.bits.dup;
3424  mm->msgid = publish->msgId;
3425 
3426  if (publish->MQTTVersion >= MQTTVERSION_5)
3427  mm->properties = MQTTProperties_copy(&publish->properties);
3428 
3429  if (client->messageQueue->count == 0 && client->connected)
3430  {
3431  ListElement* found = NULL;
3432 
3433  if ((found = ListFindItem(handles, client, clientStructCompare)) == NULL)
3434  Log(LOG_ERROR, -1, "processPublication: did not find client structure in handles list");
3435  else
3436  {
3437  MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
3438 
3439  if (m->ma)
3440  rc = MQTTAsync_deliverMessage(m, publish->topic, publish->topiclen, mm);
3441  }
3442  }
3443 
3444  if (rc == 0) /* if message was not delivered, queue it up */
3445  {
3446  qEntry* qe = malloc(sizeof(qEntry));
3447 
3448  if (!qe)
3449  goto exit;
3450  qe->msg = mm;
3451  qe->topicName = publish->topic;
3452  qe->topicLen = publish->topiclen;
3453  ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
3454 #if !defined(NO_PERSISTENCE)
3455  if (client->persistence)
3457 #endif
3458  }
3459 exit:
3460  publish->topic = NULL;
3461  FUNC_EXIT;
3462 }
3463 
3464 
3465 static int retryLoopInterval = 5;
3466 
3467 static void setRetryLoopInterval(int keepalive)
3468 {
3469  int proposed = keepalive / 10;
3470 
3471  if (proposed < 1)
3472  proposed = 1;
3473  else if (proposed > 5)
3474  proposed = 5;
3475  if (proposed < retryLoopInterval)
3476  retryLoopInterval = proposed;
3477 }
3478 
3479 
3481 {
3482  MQTTAsyncs* m = handle;
3483  int rc = MQTTASYNC_SUCCESS;
3485  thread_id_type thread_id = 0;
3486  int locked = 0;
3487 
3488  FUNC_ENTRY;
3489  if (options == NULL)
3490  {
3492  goto exit;
3493  }
3494 
3495  if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 7)
3496  {
3498  goto exit;
3499  }
3500 
3501 #if defined(OPENSSL)
3502  if (m->ssl && options->ssl == NULL)
3503  {
3505  goto exit;
3506  }
3507 #endif
3508 
3509  if (options->will) /* check validity of will options structure */
3510  {
3511  if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
3512  {
3514  goto exit;
3515  }
3516  if (options->will->qos < 0 || options->will->qos > 2)
3517  {
3518  rc = MQTTASYNC_BAD_QOS;
3519  goto exit;
3520  }
3521  if (options->will->topicName == NULL)
3522  {
3524  goto exit;
3525  } else if (strlen(options->will->topicName) == 0)
3526  {
3528  goto exit;
3529  }
3530  }
3531  if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
3532  {
3533  if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
3534  {
3536  goto exit;
3537  }
3538  }
3539  if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
3540  {
3542  goto exit;
3543  }
3544  if ((options->username && !UTF8_validateString(options->username)) ||
3545  (options->password && !UTF8_validateString(options->password)))
3546  {
3548  goto exit;
3549  }
3550  if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
3551  {
3553  goto exit;
3554  }
3555  if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
3556  {
3558  goto exit;
3559  }
3560  if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
3561  {
3562  if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
3563  options->connectProperties || options->willProperties)
3564  {
3566  goto exit;
3567  }
3568  }
3569 
3570  m->connect.onSuccess = options->onSuccess;
3571  m->connect.onFailure = options->onFailure;
3572  if (options->struct_version >= 6)
3573  {
3574  m->connect.onSuccess5 = options->onSuccess5;
3575  m->connect.onFailure5 = options->onFailure5;
3576  }
3577  m->connect.context = options->context;
3578  m->connectTimeout = options->connectTimeout;
3579 
3580  tostop = 0;
3581 
3582  /* don't lock async mutex if we are being called from a callback */
3583  thread_id = Thread_getid();
3584  if (thread_id != sendThread_id && thread_id != receiveThread_id)
3585  {
3587  locked = 1;
3588  }
3590  {
3593  }
3595  {
3598  }
3599  if (locked)
3601 
3602  m->c->keepAliveInterval = options->keepAliveInterval;
3604  m->c->cleansession = options->cleansession;
3605  m->c->maxInflightMessages = options->maxInflight;
3606  if (options->struct_version >= 3)
3607  m->c->MQTTVersion = options->MQTTVersion;
3608  else
3610  if (options->struct_version >= 4)
3611  {
3612  m->automaticReconnect = options->automaticReconnect;
3613  m->minRetryInterval = options->minRetryInterval;
3614  m->maxRetryInterval = options->maxRetryInterval;
3615  }
3616  if (options->struct_version >= 7)
3617  {
3618  m->c->net.httpHeaders = (const MQTTClient_nameValue *) options->httpHeaders;
3619  }
3620 
3621  if (m->c->will)
3622  {
3623  free(m->c->will->payload);
3624  free(m->c->will->topic);
3625  free(m->c->will);
3626  m->c->will = NULL;
3627  }
3628 
3629  if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
3630  {
3631  const void* source = NULL;
3632 
3633  if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
3634  {
3635  rc = PAHO_MEMORY_ERROR;
3636  goto exit;
3637  }
3638  if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
3639  {
3640  if (options->will->struct_version == 1 && options->will->payload.data)
3641  {
3642  m->c->will->payloadlen = options->will->payload.len;
3643  source = options->will->payload.data;
3644  }
3645  else
3646  {
3647  m->c->will->payloadlen = (int)strlen(options->will->message);
3648  source = (void*)options->will->message;
3649  }
3650  if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
3651  {
3652  rc = PAHO_MEMORY_ERROR;
3653  goto exit;
3654  }
3655  memcpy(m->c->will->payload, source, m->c->will->payloadlen);
3656  }
3657  else
3658  {
3659  m->c->will->payload = NULL;
3660  m->c->will->payloadlen = 0;
3661  }
3662  m->c->will->qos = options->will->qos;
3663  m->c->will->retained = options->will->retained;
3664  m->c->will->topic = MQTTStrdup(options->will->topicName);
3665  }
3666 
3667 #if defined(OPENSSL)
3668  if (m->c->sslopts)
3669  {
3670  if (m->c->sslopts->trustStore)
3671  free((void*)m->c->sslopts->trustStore);
3672  if (m->c->sslopts->keyStore)
3673  free((void*)m->c->sslopts->keyStore);
3674  if (m->c->sslopts->privateKey)
3675  free((void*)m->c->sslopts->privateKey);
3676  if (m->c->sslopts->privateKeyPassword)
3677  free((void*)m->c->sslopts->privateKeyPassword);
3678  if (m->c->sslopts->enabledCipherSuites)
3679  free((void*)m->c->sslopts->enabledCipherSuites);
3680  if (m->c->sslopts->struct_version >= 2)
3681  {
3682  if (m->c->sslopts->CApath)
3683  free((void*)m->c->sslopts->CApath);
3684  }
3685  free((void*)m->c->sslopts);
3686  m->c->sslopts = NULL;
3687  }
3688 
3689  if (options->struct_version != 0 && options->ssl)
3690  {
3691  if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
3692  {
3693  rc = PAHO_MEMORY_ERROR;
3694  goto exit;
3695  }
3696  memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
3697  m->c->sslopts->struct_version = options->ssl->struct_version;
3698  if (options->ssl->trustStore)
3699  m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
3700  if (options->ssl->keyStore)
3701  m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
3702  if (options->ssl->privateKey)
3703  m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
3704  if (options->ssl->privateKeyPassword)
3705  m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
3706  if (options->ssl->enabledCipherSuites)
3707  m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
3708  m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
3709  if (m->c->sslopts->struct_version >= 1)
3710  m->c->sslopts->sslVersion = options->ssl->sslVersion;
3711  if (m->c->sslopts->struct_version >= 2)
3712  {
3713  m->c->sslopts->verify = options->ssl->verify;
3714  if (options->ssl->CApath)
3715  m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
3716  }
3717  if (m->c->sslopts->struct_version >= 3)
3718  {
3719  m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
3720  m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
3721  }
3722  if (m->c->sslopts->struct_version >= 4)
3723  {
3724  m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
3725  m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
3726  m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
3727  }
3728  if (m->c->sslopts->struct_version >= 5)
3729  {
3730  m->c->sslopts->protos = options->ssl->protos;
3731  m->c->sslopts->protos_len = options->ssl->protos_len;
3732  }
3733  }
3734 #else
3735  if (options->struct_version != 0 && options->ssl)
3736  {
3738  goto exit;
3739  }
3740 #endif
3741 
3742  if (m->c->username)
3743  free((void*)m->c->username);
3744  if (options->username)
3745  m->c->username = MQTTStrdup(options->username);
3746  if (m->c->password)
3747  free((void*)m->c->password);
3748  if (options->password)
3749  {
3750  m->c->password = MQTTStrdup(options->password);
3751  m->c->passwordlen = (int)strlen(options->password);
3752  }
3753  else if (options->struct_version >= 5 && options->binarypwd.data)
3754  {
3755  m->c->passwordlen = options->binarypwd.len;
3756  if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
3757  {
3758  rc = PAHO_MEMORY_ERROR;
3759  goto exit;
3760  }
3761  memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
3762  }
3763 
3764  m->c->retryInterval = options->retryInterval;
3765  m->shouldBeConnected = 1;
3766 
3767  m->connectTimeout = options->connectTimeout;
3768 
3770  if (options->struct_version >= 2 && options->serverURIcount > 0)
3771  {
3772  int i;
3773 
3774  m->serverURIcount = options->serverURIcount;
3775  if ((m->serverURIs = malloc(options->serverURIcount * sizeof(char*))) == NULL)
3776  {
3777  rc = PAHO_MEMORY_ERROR;
3778  goto exit;
3779  }
3780  for (i = 0; i < options->serverURIcount; ++i)
3781  m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
3782  }
3783 
3784  if (m->connectProps)
3785  {
3787  free(m->connectProps);
3788  m->connectProps = NULL;
3789  }
3790  if (m->willProps)
3791  {
3793  free(m->willProps);
3794  m->willProps = NULL;
3795  }
3796  if (options->struct_version >=6)
3797  {
3798  if (options->connectProperties)
3799  {
3801 
3802  if ((m->connectProps = malloc(sizeof(MQTTProperties))) == NULL)
3803  {
3804  rc = PAHO_MEMORY_ERROR;
3805  goto exit;
3806  }
3807  *m->connectProps = initialized;
3809 
3813 
3814  }
3815  if (options->willProperties)
3816  {
3818 
3819  if ((m->willProps = malloc(sizeof(MQTTProperties))) == NULL)
3820  {
3821  rc = PAHO_MEMORY_ERROR;
3822  goto exit;
3823  }
3824  *m->willProps = initialized;
3826  }
3827  m->c->cleanstart = options->cleanstart;
3828  }
3829 
3830  /* Add connect request to operation queue */
3831  if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
3832  {
3833  rc = PAHO_MEMORY_ERROR;
3834  goto exit;
3835  }
3836  memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
3837  conn->client = m;
3838  if (options)
3839  {
3840  conn->command.onSuccess = options->onSuccess;
3841  conn->command.onFailure = options->onFailure;
3842  conn->command.onSuccess5 = options->onSuccess5;
3843  conn->command.onFailure5 = options->onFailure5;
3844  conn->command.context = options->context;
3845  }
3846  conn->command.type = CONNECT;
3847  conn->command.details.conn.currentURI = 0;
3848  rc = MQTTAsync_addCommand(conn, sizeof(conn));
3849 
3850 exit:
3851  FUNC_EXIT_RC(rc);
3852  return rc;
3853 }
3854 
3855 
3856 static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal)
3857 {
3858  MQTTAsyncs* m = handle;
3859  int rc = MQTTASYNC_SUCCESS;
3861 
3862  FUNC_ENTRY;
3863  if (m == NULL || m->c == NULL)
3864  {
3865  rc = MQTTASYNC_FAILURE;
3866  goto exit;
3867  }
3868  if (!internal)
3869  m->shouldBeConnected = 0;
3870  if (m->c->connected == 0)
3871  {
3873  goto exit;
3874  }
3875 
3876  /* Add disconnect request to operation queue */
3877  if ((dis = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
3878  {
3879  rc = PAHO_MEMORY_ERROR;
3880  goto exit;
3881  }
3882  memset(dis, '\0', sizeof(MQTTAsync_queuedCommand));
3883  dis->client = m;
3884  if (options)
3885  {
3886  dis->command.onSuccess = options->onSuccess;
3887  dis->command.onFailure = options->onFailure;
3888  dis->command.onSuccess5 = options->onSuccess5;
3889  dis->command.onFailure5 = options->onFailure5;
3890  dis->command.context = options->context;
3891  dis->command.details.dis.timeout = options->timeout;
3892  if (m->c->MQTTVersion >= MQTTVERSION_5 && options->struct_version >= 1)
3893  {
3895  dis->command.details.dis.reasonCode = options->reasonCode;
3896  }
3897  }
3898  dis->command.type = DISCONNECT;
3899  dis->command.details.dis.internal = internal;
3900  rc = MQTTAsync_addCommand(dis, sizeof(dis));
3901 
3902 exit:
3903  FUNC_EXIT_RC(rc);
3904  return rc;
3905 }
3906 
3907 
3908 static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
3909 {
3911 
3912  options.timeout = timeout;
3913  return MQTTAsync_disconnect1(handle, &options, 1);
3914 }
3915 
3916 
3918 {
3920 }
3921 
3922 
3924 {
3925  if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
3926  return MQTTASYNC_BAD_STRUCTURE;
3927  else
3928  return MQTTAsync_disconnect1(handle, options, 0);
3929 }
3930 
3931 
3933 {
3934  MQTTAsyncs* m = handle;
3935  int rc = 0;
3936 
3937  FUNC_ENTRY;
3939  if (m && m->c)
3940  rc = m->c->connected;
3942  FUNC_EXIT_RC(rc);
3943  return rc;
3944 }
3945 
3946 
3947 static int cmdMessageIDCompare(void* a, void* b)
3948 {
3950  return cmd->command.token == *(int*)b;
3951 }
3952 
3953 
3961 {
3962  int start_msgid;
3963  int msgid;
3964  thread_id_type thread_id = 0;
3965  int locked = 0;
3966 
3967  /* need to check: commands list and response list for a client */
3968  FUNC_ENTRY;
3969  /* We might be called in a callback. In which case, this mutex will be already locked. */
3970  thread_id = Thread_getid();
3971  if (thread_id != sendThread_id && thread_id != receiveThread_id)
3972  {
3974  locked = 1;
3975  }
3976 
3977  /* Fetch last message ID in locked state */
3978  start_msgid = m->c->msgID;
3979  msgid = start_msgid;
3980 
3982  msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
3983  while (ListFindItem(commands, &msgid, cmdMessageIDCompare) ||
3984  ListFindItem(m->c->outboundMsgs, &msgid, messageIDCompare) ||
3986  {
3987  msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
3988  if (msgid == start_msgid)
3989  { /* we've tried them all - none free */
3990  msgid = 0;
3991  break;
3992  }
3993  }
3995  if (msgid != 0)
3996  m->c->msgID = msgid;
3997  if (locked)
3999  FUNC_EXIT_RC(msgid);
4000  return msgid;
4001 }
4002 
4003 
4004 int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int* qos, MQTTAsync_responseOptions* response)
4005 {
4006  MQTTAsyncs* m = handle;
4007  int i = 0;
4008  int rc = MQTTASYNC_SUCCESS;
4010  int msgid = 0;
4011 
4012  FUNC_ENTRY;
4013  if (m == NULL || m->c == NULL)
4014  rc = MQTTASYNC_FAILURE;
4015  else if (m->c->connected == 0)
4017  else for (i = 0; i < count; i++)
4018  {
4019  if (!UTF8_validateString(topic[i]))
4020  {
4022  break;
4023  }
4024  if (qos[i] < 0 || qos[i] > 2)
4025  {
4026  rc = MQTTASYNC_BAD_QOS;
4027  break;
4028  }
4029  }
4030  if (rc != MQTTASYNC_SUCCESS)
4031  ; /* don't overwrite a previous error code */
4032  else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
4034  else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
4035  && response->subscribeOptionsCount != 0))
4037  else if (response)
4038  {
4039  if (m->c->MQTTVersion >= MQTTVERSION_5)
4040  {
4041  if (response->struct_version == 0 || response->onFailure || response->onSuccess)
4043  }
4044  else if (m->c->MQTTVersion < MQTTVERSION_5)
4045  {
4046  if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
4048  }
4049  }
4050  if (rc != MQTTASYNC_SUCCESS)
4051  goto exit;
4052 
4053  /* Add subscribe request to operation queue */
4054  if ((sub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
4055  {
4056  rc = PAHO_MEMORY_ERROR;
4057  goto exit;
4058  }
4059  memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
4060  sub->client = m;
4061  sub->command.token = msgid;
4062  if (response)
4063  {
4064  sub->command.onSuccess = response->onSuccess;
4065  sub->command.onFailure = response->onFailure;
4066  sub->command.onSuccess5 = response->onSuccess5;
4067  sub->command.onFailure5 = response->onFailure5;
4068  sub->command.context = response->context;
4069  response->token = sub->command.token;
4070  if (m->c->MQTTVersion >= MQTTVERSION_5)
4071  {
4072  sub->command.properties = MQTTProperties_copy(&response->properties);
4073  sub->command.details.sub.opts = response->subscribeOptions;
4074  if (count > 1)
4075  {
4076  if ((sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count)) == NULL)
4077  {
4078  rc = PAHO_MEMORY_ERROR;
4079  goto exit;
4080  }
4081  if (response->subscribeOptionsCount == 0)
4082  {
4084  for (i = 0; i < count; ++i)
4085  sub->command.details.sub.optlist[i] = initialized;
4086  }
4087  else
4088  {
4089  for (i = 0; i < count; ++i)
4090  sub->command.details.sub.optlist[i] = response->subscribeOptionsList[i];
4091  }
4092  }
4093  }
4094  }
4095  sub->command.type = SUBSCRIBE;
4096  sub->command.details.sub.count = count;
4097  sub->command.details.sub.topics = malloc(sizeof(char*) * count);
4098  sub->command.details.sub.qoss = malloc(sizeof(int) * count);
4099  if (sub->command.details.sub.topics && sub->command.details.sub.qoss)
4100  {
4101  for (i = 0; i < count; ++i)
4102  {
4103  if ((sub->command.details.sub.topics[i] = MQTTStrdup(topic[i])) == NULL)
4104  {
4105  rc = PAHO_MEMORY_ERROR;
4106  goto exit;
4107  }
4108  sub->command.details.sub.qoss[i] = qos[i];
4109  }
4110  rc = MQTTAsync_addCommand(sub, sizeof(sub));
4111  }
4112  else
4113  rc = PAHO_MEMORY_ERROR;
4114 
4115 exit:
4116  FUNC_EXIT_RC(rc);
4117  return rc;
4118 }
4119 
4120 
4121 int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response)
4122 {
4123  int rc = 0;
4124  FUNC_ENTRY;
4125  rc = MQTTAsync_subscribeMany(handle, 1, (char * const *)(&topic), &qos, response);
4126  FUNC_EXIT_RC(rc);
4127  return rc;
4128 }
4129 
4130 
4132 {
4133  MQTTAsyncs* m = handle;
4134  int i = 0;
4135  int rc = MQTTASYNC_SUCCESS;
4136  MQTTAsync_queuedCommand* unsub;
4137  int msgid = 0;
4138 
4139  FUNC_ENTRY;
4140  if (m == NULL || m->c == NULL)
4141  rc = MQTTASYNC_FAILURE;
4142  else if (m->c->connected == 0)
4144  else for (i = 0; i < count; i++)
4145  {
4146  if (!UTF8_validateString(topic[i]))
4147  {
4149  break;
4150  }
4151  }
4152  if (rc != MQTTASYNC_SUCCESS)
4153  ; /* don't overwrite a previous error code */
4154  else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
4156  else if (response)
4157  {
4158  if (m->c->MQTTVersion >= MQTTVERSION_5)
4159  {
4160  if (response->struct_version == 0 || response->onFailure || response->onSuccess)
4162  }
4163  else if (m->c->MQTTVersion < MQTTVERSION_5)
4164  {
4165  if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
4167  }
4168  }
4169  if (rc != MQTTASYNC_SUCCESS)
4170  goto exit;
4171 
4172  /* Add unsubscribe request to operation queue */
4173  if ((unsub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
4174  {
4175  rc = PAHO_MEMORY_ERROR;
4176  goto exit;
4177  }
4178  memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
4179  unsub->client = m;
4180  unsub->command.type = UNSUBSCRIBE;
4181  unsub->command.token = msgid;
4182  if (response)
4183  {
4184  unsub->command.onSuccess = response->onSuccess;
4185  unsub->command.onFailure = response->onFailure;
4186  unsub->command.onSuccess5 = response->onSuccess5;
4187  unsub->command.onFailure5 = response->onFailure5;
4188  unsub->command.context = response->context;
4189  response->token = unsub->command.token;
4190  if (m->c->MQTTVersion >= MQTTVERSION_5)
4191  unsub->command.properties = MQTTProperties_copy(&response->properties);
4192  }
4193  unsub->command.details.unsub.count = count;
4194  if ((unsub->command.details.unsub.topics = malloc(sizeof(char*) * count)) == NULL)
4195  {
4196  rc = PAHO_MEMORY_ERROR;
4197  goto exit;
4198  }
4199  for (i = 0; i < count; ++i)
4200  unsub->command.details.unsub.topics[i] = MQTTStrdup(topic[i]);
4201  rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
4202 
4203 exit:
4204  FUNC_EXIT_RC(rc);
4205  return rc;
4206 }
4207 
4208 
4210 {
4211  int rc = 0;
4212  FUNC_ENTRY;
4213  rc = MQTTAsync_unsubscribeMany(handle, 1, (char * const *)(&topic), response);
4214  FUNC_EXIT_RC(rc);
4215  return rc;
4216 }
4217 
4218 
4220 {
4221  int count = 0;
4222 
4224  count = m->noBufferedMessages;
4226  return count;
4227 }
4228 
4229 
4230 int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, const void* payload,
4231  int qos, int retained, MQTTAsync_responseOptions* response)
4232 {
4233  int rc = MQTTASYNC_SUCCESS;
4234  MQTTAsyncs* m = handle;
4236  int msgid = 0;
4237 
4238  FUNC_ENTRY;
4239  if (m == NULL || m->c == NULL)
4240  rc = MQTTASYNC_FAILURE;
4241  else if (m->c->connected == 0)
4242  {
4243  if (m->createOptions == NULL)
4245  else if (m->createOptions->sendWhileDisconnected == 0)
4249  }
4250 
4251  if (rc != MQTTASYNC_SUCCESS)
4252  goto exit;
4253 
4254  if (!UTF8_validateString(destinationName))
4256  else if (qos < 0 || qos > 2)
4257  rc = MQTTASYNC_BAD_QOS;
4258  else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
4260  else if (m->createOptions &&
4264  else if (response)
4265  {
4266  if (m->c->MQTTVersion >= MQTTVERSION_5)
4267  {
4268  if (response->struct_version == 0 || response->onFailure || response->onSuccess)
4270  }
4271  else if (m->c->MQTTVersion < MQTTVERSION_5)
4272  {
4273  if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
4275  }
4276  }
4277 
4278  if (rc != MQTTASYNC_SUCCESS)
4279  goto exit;
4280 
4281  /* Add publish request to operation queue */
4282  if ((pub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
4283  {
4284  rc = PAHO_MEMORY_ERROR;
4285  goto exit;
4286  }
4287  memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
4288  pub->client = m;
4289  pub->command.type = PUBLISH;
4290  pub->command.token = msgid;
4291  if (response)
4292  {
4293  pub->command.onSuccess = response->onSuccess;
4294  pub->command.onFailure = response->onFailure;
4295  pub->command.onSuccess5 = response->onSuccess5;
4296  pub->command.onFailure5 = response->onFailure5;
4297  pub->command.context = response->context;
4298  response->token = pub->command.token;
4299  if (m->c->MQTTVersion >= MQTTVERSION_5)
4300  pub->command.properties = MQTTProperties_copy(&response->properties);
4301  }
4302  if ((pub->command.details.pub.destinationName = MQTTStrdup(destinationName)) == NULL)
4303  {
4304  free(pub);
4305  rc = PAHO_MEMORY_ERROR;
4306  goto exit;
4307  }
4308  pub->command.details.pub.payloadlen = payloadlen;
4309  if ((pub->command.details.pub.payload = malloc(payloadlen)) == NULL)
4310  {
4311  free(pub->command.details.pub.destinationName);
4312  free(pub);
4313  rc = PAHO_MEMORY_ERROR;
4314  goto exit;
4315  }
4316  memcpy(pub->command.details.pub.payload, payload, payloadlen);
4317  pub->command.details.pub.qos = qos;
4318  pub->command.details.pub.retained = retained;
4319  rc = MQTTAsync_addCommand(pub, sizeof(pub));
4320 
4321 exit:
4322  FUNC_EXIT_RC(rc);
4323  return rc;
4324 }
4325 
4326 
4327 
4328 int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* message,
4329  MQTTAsync_responseOptions* response)
4330 {
4331  int rc = MQTTASYNC_SUCCESS;
4332  MQTTAsyncs* m = handle;
4333 
4334  FUNC_ENTRY;
4335  if (message == NULL)
4336  {
4338  goto exit;
4339  }
4340  if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
4341  (message->struct_version != 0 && message->struct_version != 1))
4342  {
4344  goto exit;
4345  }
4346 
4347  if (m->c->MQTTVersion >= MQTTVERSION_5 && response)
4348  response->properties = message->properties;
4349 
4350  rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
4351  message->qos, message->retained, response);
4352 exit:
4353  FUNC_EXIT_RC(rc);
4354  return rc;
4355 }
4356 
4357 
4358 static void MQTTAsync_retry(void)
4359 {
4360  static START_TIME_TYPE last = START_TIME_ZERO;
4361  START_TIME_TYPE now;
4362 
4363  FUNC_ENTRY;
4364  now = MQTTTime_now();
4365  if (MQTTTime_difftime(now, last) > (DIFF_TIME_TYPE)(retryLoopInterval * 1000))
4366  {
4367  last = MQTTTime_now();
4369  MQTTProtocol_retry(now, 1, 0);
4370  }
4371  else
4372  MQTTProtocol_retry(now, 0, 0);
4373  FUNC_EXIT;
4374 }
4375 
4376 
4378 {
4379  int rc = -1;
4380  char* serverURI = m->serverURI;
4381 #if defined(OPENSSL)
4382  int default_port = MQTT_DEFAULT_PORT;
4383 #endif
4384 
4385  FUNC_ENTRY;
4386  if (m->serverURIcount > 0)
4387  {
4388  serverURI = m->serverURIs[m->connect.details.conn.currentURI];
4389 
4390  /* skip URI scheme */
4391  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
4392  serverURI += strlen(URI_TCP);
4393  else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
4394  {
4395  serverURI += strlen(URI_WS);
4396 #if defined(OPENSSL)
4397  default_port = WS_DEFAULT_PORT;
4398 #endif
4399  }
4400 #if defined(OPENSSL)
4401  else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
4402  {
4403  serverURI += strlen(URI_SSL);
4404  default_port = SECURE_MQTT_DEFAULT_PORT;
4405  }
4406  else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
4407  {
4408  serverURI += strlen(URI_WSS);
4409  default_port = WS_DEFAULT_PORT;
4410  }
4411 #endif
4412  }
4413 
4414  if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - check for completion */
4415  {
4416  int error;
4417  socklen_t len = sizeof(error);
4418 
4419  if ((rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
4420  rc = error;
4421 
4422  if (rc != 0)
4423  goto exit;
4424 
4426 
4427 #if defined(OPENSSL)
4428  if (m->ssl)
4429  {
4430  int port;
4431  size_t hostname_len;
4432  int setSocketForSSLrc = 0;
4433 
4434  if (m->websocket && m->c->net.https_proxy) {
4436  if ((rc = WebSocket_proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
4437  goto exit;
4438  }
4439 
4440  hostname_len = MQTTProtocol_addressPort(serverURI, &port, NULL, default_port);
4441  setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
4442  serverURI, hostname_len);
4443 
4444  if (setSocketForSSLrc != MQTTASYNC_SUCCESS)
4445  {
4446  if (m->c->session != NULL)
4447  if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
4448  Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
4449  rc = m->c->sslopts->struct_version >= 3 ?
4450  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
4451  m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
4452  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
4453  m->c->sslopts->verify, NULL, NULL);
4454  if (rc == TCPSOCKET_INTERRUPTED)
4455  {
4456  rc = MQTTCLIENT_SUCCESS; /* the connect is still in progress */
4458  }
4459  else if (rc == SSL_FATAL)
4460  {
4461  rc = SOCKET_ERROR;
4462  goto exit;
4463  }
4464  else if (rc == 1)
4465  {
4466  if ( m->websocket )
4467  {
4469  if ((rc = WebSocket_connect(&m->c->net, serverURI)) == SOCKET_ERROR )
4470  goto exit;
4471  }
4472  else
4473  {
4474  rc = MQTTCLIENT_SUCCESS;
4476  if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
4477  m->connectProps, m->willProps) == SOCKET_ERROR)
4478  {
4479  rc = SOCKET_ERROR;
4480  goto exit;
4481  }
4482  }
4483  if (!m->c->cleansession && m->c->session == NULL)
4484  m->c->session = SSL_get1_session(m->c->net.ssl);
4485  }
4486  }
4487  else
4488  {
4489  rc = SOCKET_ERROR;
4490  goto exit;
4491  }
4492  }
4493  else
4494  {
4495 #endif
4496  if ( m->websocket )
4497  {
4498  if (m->c->net.http_proxy) {
4500  if ((rc = WebSocket_proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
4501  goto exit;
4502  }
4503 
4505  if ((rc = WebSocket_connect(&m->c->net, serverURI)) == SOCKET_ERROR )
4506  goto exit;
4507  }
4508  else
4509  {
4510  m->c->connect_state = WAIT_FOR_CONNACK; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
4511  if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
4512  m->connectProps, m->willProps)) == SOCKET_ERROR)
4513  goto exit;
4514  }
4515 #if defined(OPENSSL)
4516  }
4517 #endif
4518  }
4519 #if defined(OPENSSL)
4520  else if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
4521  {
4522  rc = m->c->sslopts->struct_version >= 3 ?
4523  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
4524  m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
4525  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
4526  m->c->sslopts->verify, NULL, NULL);
4527  if (rc != 1)
4528  goto exit;
4529 
4530  if(!m->c->cleansession && m->c->session == NULL)
4531  m->c->session = SSL_get1_session(m->c->net.ssl);
4532 
4533  if ( m->websocket )
4534  {
4536  if ((rc = WebSocket_connect(&m->c->net, serverURI)) == SOCKET_ERROR )
4537  goto exit;
4538  }
4539  else
4540  {
4541  m->c->connect_state = WAIT_FOR_CONNACK; /* SSL connect completed, in which case send the MQTT connect packet */
4542  if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion,
4543  m->connectProps, m->willProps)) == SOCKET_ERROR)
4544  goto exit;
4545  }
4546  }
4547 #endif
4548  else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* Websocket connect sent - wait for completion */
4549  {
4550  if ((rc = WebSocket_upgrade( &m->c->net ) ) == SOCKET_ERROR )
4551  goto exit;
4552  else if (rc != TCPSOCKET_INTERRUPTED)
4553  {
4554  m->c->connect_state = WAIT_FOR_CONNACK; /* Websocket upgrade completed, in which case send the MQTT connect packet */
4555  if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, m->connectProps, m->willProps)) == SOCKET_ERROR)
4556  goto exit;
4557  }
4558  }
4559 
4560 exit:
4561  if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && (m->c->connect_state != SSL_IN_PROGRESS && m->c->connect_state != WEBSOCKET_IN_PROGRESS)) || (rc == SSL_FATAL))
4562  nextOrClose(m, MQTTASYNC_FAILURE, "TCP/TLS connect failure");
4563 
4564  FUNC_EXIT_RC(rc);
4565  return rc;
4566 }
4567 
4568 
4569 static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
4570 {
4571  struct timeval tp = {0L, 0L};
4572  MQTTPacket* pack = NULL;
4573 
4574  FUNC_ENTRY;
4575  if (timeout > 0L)
4576  {
4577  tp.tv_sec = timeout / 1000;
4578  tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
4579  }
4580 
4581 #if defined(OPENSSL)
4582  if ((*sock = SSLSocket_getPendingRead()) == -1)
4583  {
4584 #endif
4585  /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
4586  *sock = Socket_getReadySocket(0, &tp,socket_mutex);
4587  if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
4588  MQTTAsync_sleep(100L);
4589 #if defined(OPENSSL)
4590  }
4591 #endif
4593  if (*sock > 0)
4594  {
4595  MQTTAsyncs* m = NULL;
4596  if (ListFindItem(handles, sock, clientSockCompare) != NULL)
4597  m = (MQTTAsync)(handles->current->content);
4598  if (m != NULL)
4599  {
4600  Log(TRACE_MINIMUM, -1, "m->c->connect_state = %d", m->c->connect_state);
4602  *rc = MQTTAsync_connecting(m);
4603  else
4604  pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
4605  if (m->c->connect_state == WAIT_FOR_CONNACK && *rc == SOCKET_ERROR)
4606  {
4607  Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
4608  nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect completion failure");
4609  }
4610  }
4611  if (pack)
4612  {
4613  int freed = 1;
4614 
4615  /* Note that these handle... functions free the packet structure that they are dealing with */
4616  if (pack->header.bits.type == PUBLISH)
4617  *rc = MQTTProtocol_handlePublishes(pack, *sock);
4618  else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP ||
4619  pack->header.bits.type == PUBREC)
4620  {
4621  int msgid = 0,
4622  msgtype = 0,
4623  ackrc = 0,
4624  mqttversion = 0;
4626 
4627  /* This block is so that the ack variable is local and isn't accidentally reused */
4628  {
4629  static Ack ack;
4630  ack = *(Ack*)pack;
4631  /* these values are stored because the packet structure is freed in the handle functions */
4632  msgid = ack.msgId;
4633  msgtype = pack->header.bits.type;
4634  if (ack.MQTTVersion >= MQTTVERSION_5)
4635  {
4636  ackrc = ack.rc;
4637  msgprops = MQTTProperties_copy(&ack.properties);
4638  mqttversion = ack.MQTTVersion;
4639  }
4640  }
4641 
4642  if (pack->header.bits.type == PUBCOMP)
4643  *rc = MQTTProtocol_handlePubcomps(pack, *sock);
4644  else if (pack->header.bits.type == PUBREC)
4645  *rc = MQTTProtocol_handlePubrecs(pack, *sock);
4646  else if (pack->header.bits.type == PUBACK)
4647  *rc = MQTTProtocol_handlePubacks(pack, *sock);
4648  if (!m)
4649  Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
4650  if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
4651  {
4652  ListElement* current = NULL;
4653 
4654  if (m->dc)
4655  {
4656  Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
4657  (*(m->dc))(m->dcContext, msgid);
4658  }
4659  /* use the msgid to find the callback to be called */
4660  while (ListNextElement(m->responses, &current))
4661  {
4663  if (command->command.token == msgid)
4664  {
4665  if (!ListDetach(m->responses, command)) /* then remove the response from the list */
4666  Log(LOG_ERROR, -1, "Publish command not removed from command list");
4667  if (command->command.onSuccess)
4668  {
4670 
4671  data.token = command->command.token;
4672  data.alt.pub.destinationName = command->command.details.pub.destinationName;
4673  data.alt.pub.message.payload = command->command.details.pub.payload;
4674  data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
4675  data.alt.pub.message.qos = command->command.details.pub.qos;
4676  data.alt.pub.message.retained = command->command.details.pub.retained;
4677  Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
4678  (*(command->command.onSuccess))(command->command.context, &data);
4679  }
4680  else if (command->command.onSuccess5 && ackrc < MQTTREASONCODE_UNSPECIFIED_ERROR)
4681  {
4683 
4684  data.token = command->command.token;
4685  data.alt.pub.destinationName = command->command.details.pub.destinationName;
4686  data.alt.pub.message.payload = command->command.details.pub.payload;
4687  data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
4688  data.alt.pub.message.qos = command->command.details.pub.qos;
4689  data.alt.pub.message.retained = command->command.details.pub.retained;
4690  data.properties = command->command.properties;
4691  Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
4692  (*(command->command.onSuccess5))(command->command.context, &data);
4693  }
4694  else if (command->command.onFailure5 && ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
4695  {
4697 
4698  data.token = command->command.token;
4699  data.reasonCode = ackrc;
4700  data.properties = msgprops;
4701  data.packet_type = pack->header.bits.type;
4702  Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
4703  (*(command->command.onFailure5))(command->command.context, &data);
4704  }
4705  MQTTAsync_freeCommand(command);
4706  break;
4707  }
4708  }
4709  if (mqttversion >= MQTTVERSION_5)
4710  MQTTProperties_free(&msgprops);
4711  }
4712  }
4713  else if (pack->header.bits.type == PUBREL)
4714  *rc = MQTTProtocol_handlePubrels(pack, *sock);
4715  else if (pack->header.bits.type == PINGRESP)
4716  *rc = MQTTProtocol_handlePingresps(pack, *sock);
4717  else
4718  freed = 0;
4719  if (freed)
4720  pack = NULL;
4721  }
4722  }
4723  MQTTAsync_retry();
4725  FUNC_EXIT_RC(*rc);
4726  return pack;
4727 }
4728 
4729 /*
4730 static int pubCompare(void* a, void* b)
4731 {
4732  Messages* msg = (Messages*)a;
4733  return msg->publish == (Publications*)b;
4734 }*/
4735 
4736 
4738 {
4739  int rc = MQTTASYNC_SUCCESS;
4740  MQTTAsyncs* m = handle;
4741  ListElement* current = NULL;
4742  int count = 0;
4743 
4744  FUNC_ENTRY;
4746  *tokens = NULL;
4747 
4748  if (m == NULL)
4749  {
4750  rc = MQTTASYNC_FAILURE;
4751  goto exit;
4752  }
4753 
4754  /* calculate the number of pending tokens - commands plus inflight */
4755  while (ListNextElement(commands, &current))
4756  {
4758 
4759  if (cmd->client == m)
4760  count++;
4761  }
4762  if (m->c)
4763  count += m->c->outboundMsgs->count;
4764  if (count == 0)
4765  goto exit; /* no tokens to return */
4766  *tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
4767  if (!*tokens)
4768  {
4769  rc = PAHO_MEMORY_ERROR;
4770  goto exit;
4771  }
4772 
4773  /* First add the unprocessed commands to the pending tokens */
4774  current = NULL;
4775  count = 0;
4776  while (ListNextElement(commands, &current))
4777  {
4779 
4780  if (cmd->client == m)
4781  (*tokens)[count++] = cmd->command.token;
4782  }
4783 
4784  /* Now add the inflight messages */
4785  if (m->c && m->c->outboundMsgs->count > 0)
4786  {
4787  current = NULL;
4788  while (ListNextElement(m->c->outboundMsgs, &current))
4789  {
4790  Messages* m = (Messages*)(current->content);
4791  (*tokens)[count++] = m->msgid;
4792  }
4793  }
4794  (*tokens)[count] = -1; /* indicate end of list */
4795 
4796 exit:
4798  FUNC_EXIT_RC(rc);
4799  return rc;
4800 }
4801 
4802 
4804 {
4805  int rc = MQTTASYNC_SUCCESS;
4806  MQTTAsyncs* m = handle;
4807  ListElement* current = NULL;
4808 
4809  FUNC_ENTRY;
4811 
4812  if (m == NULL)
4813  {
4814  rc = MQTTASYNC_FAILURE;
4815  goto exit;
4816  }
4817 
4818  /* First check unprocessed commands */
4819  current = NULL;
4820  while (ListNextElement(commands, &current))
4821  {
4823 
4824  if (cmd->client == m && cmd->command.token == dt)
4825  goto exit;
4826  }
4827 
4828  /* Now check the inflight messages */
4829  if (m->c && m->c->outboundMsgs->count > 0)
4830  {
4831  current = NULL;
4832  while (ListNextElement(m->c->outboundMsgs, &current))
4833  {
4834  Messages* m = (Messages*)(current->content);
4835  if (m->msgid == dt)
4836  goto exit;
4837  }
4838  }
4839  rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
4840 
4841 exit:
4843  FUNC_EXIT_RC(rc);
4844  return rc;
4845 }
4846 
4847 
4848 int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
4849 {
4850  int rc = MQTTASYNC_FAILURE;
4853  MQTTAsyncs* m = handle;
4854 
4855  FUNC_ENTRY;
4857 
4858  if (m == NULL || m->c == NULL)
4859  {
4861  rc = MQTTASYNC_FAILURE;
4862  goto exit;
4863  }
4864  if (m->c->connected == 0)
4865  {
4868  goto exit;
4869  }
4871 
4872  if (MQTTAsync_isComplete(handle, dt) == 1)
4873  {
4874  rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
4875  goto exit;
4876  }
4877 
4878  elapsed = MQTTTime_elapsed(start);
4879  while (elapsed < timeout && rc == MQTTASYNC_FAILURE)
4880  {
4881  MQTTTime_sleep(100);
4882  if (MQTTAsync_isComplete(handle, dt) == 1)
4883  rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
4885  if (m->c->connected == 0)
4888  elapsed = MQTTTime_elapsed(start);
4889  }
4890 exit:
4891  FUNC_EXIT_RC(rc);
4892  return rc;
4893 }
4894 
4895 
4896 
4898 {
4899  Log_setTraceLevel((enum LOG_LEVELS)level);
4900 }
4901 
4902 
4904 {
4906 }
4907 
4908 
4910 {
4911  #define MAX_INFO_STRINGS 8
4912  static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
4913  int i = 0;
4914 
4915  libinfo[i].name = "Product name";
4916  libinfo[i++].value = "Eclipse Paho Asynchronous MQTT C Client Library";
4917 
4918  libinfo[i].name = "Version";
4919  libinfo[i++].value = CLIENT_VERSION;
4920 
4921  libinfo[i].name = "Build level";
4922  libinfo[i++].value = BUILD_TIMESTAMP;
4923 #if defined(OPENSSL)
4924  libinfo[i].name = "OpenSSL version";
4925  libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
4926 
4927  libinfo[i].name = "OpenSSL flags";
4928  libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
4929 
4930  libinfo[i].name = "OpenSSL build timestamp";
4931  libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
4932 
4933  libinfo[i].name = "OpenSSL platform";
4934  libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
4935 
4936  libinfo[i].name = "OpenSSL directory";
4937  libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
4938 #endif
4939  libinfo[i].name = NULL;
4940  libinfo[i].value = NULL;
4941  return libinfo;
4942 }
4943 
4944 const char* MQTTAsync_strerror(int code)
4945 {
4946  static char buf[30];
4947 
4948  switch (code) {
4949  case MQTTASYNC_SUCCESS:
4950  return "Success";
4951  case MQTTASYNC_FAILURE:
4952  return "Failure";
4954  return "Persistence error";
4956  return "Disconnected";
4958  return "Maximum in-flight messages amount reached";
4960  return "Invalid UTF8 string";
4962  return "Invalid (NULL) parameter";
4964  return "Topic containing NULL characters has been truncated";
4966  return "Bad structure";
4967  case MQTTASYNC_BAD_QOS:
4968  return "Invalid QoS value";
4970  return "Too many pending commands";
4972  return "Operation discarded before completion";
4974  return "No more messages can be buffered";
4976  return "SSL is not supported";
4978  return "Invalid protocol scheme";
4980  return "Options for wrong MQTT version";
4982  return "Client created for another version of MQTT";
4984  return "Zero length will topic on connect";
4985  }
4986 
4987  sprintf(buf, "Unknown error code %d", code);
4988  return buf;
4989 }
static void MQTTAsync_unlock_mutex(mutex_type amutex)
Definition: MQTTAsync.c:551
static int clientCompareConnectCommand(void *a, void *b)
Definition: MQTTAsync.c:1403
static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command *command)
Definition: MQTTAsync.c:1592
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
int Thread_post_sem(sem_type sem)
Definition: Thread.c:313
MQTTProperties properties
Definition: MQTTAsync.h:316
int WebSocket_proxy_connect(networkHandles *net, int ssl, const char *hostname)
Definition: WebSocket.c:1445
#define PERSISTENCE_PUBLISH_SENT
void MQTTProtocol_removePublication(Publications *p)
MQTTProperties properties
Definition: MQTTPacket.h:176
char * topicName
Definition: MQTTAsync.c:309
List * messageQueue
Definition: Clients.h:137
int MQTTProtocol_handlePubacks(void *pack, int sock)
#define PROXY_CONNECT_IN_PROGRESS
Definition: Clients.h:110
START_TIME_TYPE lastConnectionFailedTime
Definition: MQTTAsync.c:413
const char * keyStore
Definition: MQTTAsync.h:1053
const char * client_timestamp_eye
Definition: MQTTAsync.c:73
int Thread_wait_cond(cond_type condvar, int timeout)
Definition: Thread.c:416
#define LOG_PROTOCOL
Definition: Log.h:64
void MQTTPacket_freeAck(Ack *pack)
Definition: MQTTPacket.c:617
union Connack::@62 flags
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
static pthread_mutex_t mqttcommand_mutex_store
Definition: MQTTAsync.c:267
MQTTProperties properties
Definition: MQTTPacket.h:155
#define MQTTAsync_successData5_initializer
Definition: MQTTAsync.h:616
#define TRACE_MIN
Definition: Log.h:66
void onSuccess(void *context, MQTTAsync_successData *response)
unsigned int command_seqno
Definition: MQTTAsync.c:394
#define MQTTASYNC_NULL_PARAMETER
Definition: MQTTAsync.h:140
#define DIFF_TIME_TYPE
Definition: MQTTTime.h:41
char * topic
Definition: MQTTPacket.h:200
static mutex_type mqttcommand_mutex
Definition: MQTTAsync.c:268
struct MQTTAsync_successData5::@49::@52 connect
int MQTTProperties_len(MQTTProperties *props)
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
unsigned int protos_len
Definition: MQTTAsync.h:1141
#define MQTTASYNC_SSL_NOT_SUPPORTED
Definition: MQTTAsync.h:171
const char * trustStore
Definition: MQTTAsync.h:1048
#define WebSocket_CLOSE_NORMAL
Definition: WebSocket.h:38
void MQTTAsync_traceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: MQTTAsync.h:1678
void WebSocket_close(networkHandles *net, int status_code, const char *reason)
Definition: WebSocket.c:521
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:726
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
void MQTTAsync_init_rand(void)
Definition: MQTTAsync.c:104
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
void MQTTProtocol_freeClient(Clients *client)
MQTTReasonCodes
MQTTPersistence_beforeWrite * beforeWrite
Definition: Clients.h:141
string topic
Definition: test2.py:8
#define FUNC_EXIT
Definition: StackTrace.h:59
static cond_type_struct send_cond_store
Definition: MQTTAsync.c:270
static thread_return_type WINAPI MQTTAsync_receiveThread(void *n)
Definition: MQTTAsync.c:2696
MQTTProperties props
Definition: paho_c_pub.c:54
#define MQTT_DEFAULT_PORT
static mutex_type heap_mutex
Definition: Heap.c:55
int UTF8_validateString(const char *string)
Definition: utf-8.c:156
START_TIME_TYPE start_time
Definition: MQTTAsync.c:323
const char * name
Definition: MQTTAsync.h:1149
int MQTTProtocol_handlePubcomps(void *pack, int sock)
union MQTTAsync_successData::@46 alt
int SSLSocket_setSocketForSSL(networkHandles *net, MQTTClient_SSLOptions *opts, const char *hostname, size_t hostname_len)
void WebSocket_terminate(void)
Definition: WebSocket.c:1257
int msgId
Definition: MQTTPacket.h:217
int MQTTVersion
Definition: Clients.h:146
int sessionExpiry
Definition: Clients.h:147
int msgId
Definition: MQTTPacket.h:202
void Log_setTraceLevel(enum LOG_LEVELS level)
Definition: Log.c:224
int MQTTProtocol_connect(const char *ip_address, Clients *aClient, int websocket, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl)
Definition: MQTTAsync.c:3088
DIFF_TIME_TYPE MQTTTime_difftime(START_TIME_TYPE new, START_TIME_TYPE old)
Definition: MQTTTime.c:99
ListElement * current
Definition: LinkedList.h:69
const char * message
Definition: MQTTAsync.h:996
const char * topicName
Definition: MQTTAsync.h:994
bool retain
Definition: MQTTPacket.h:77
LOG_LEVELS
Definition: Log.h:35
static void MQTTAsync_freeServerURIs(MQTTAsyncs *m)
Definition: MQTTAsync.c:1671
void MQTTProperties_free(MQTTProperties *props)
#define DISCONNECTING
Definition: Clients.h:112
MQTTProperties properties
Definition: MQTTAsync.h:534
#define MQTTASYNC_FAILURE
Definition: MQTTAsync.h:118
MQTTAsync_token token
Definition: MQTTAsync.h:582
void MQTTPacket_freeConnack(Connack *pack)
int Socket_noPendingWrites(int socket)
Definition: Socket.c:417
struct Connack::@62::@63 bits
MQTTProperties properties
Definition: MQTTAsync.h:584
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
void * MQTTAsync_malloc(size_t size)
Definition: MQTTAsync.c:2634
int msgID
Definition: Clients.h:130
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
struct Header::@59 bits
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
const MQTTClient_nameValue * httpHeaders
Definition: Clients.h:94
struct MQTTAsync_successData5::@49::@53 unsub
#define MQTTASYNC_BAD_UTF8_STRING
Definition: MQTTAsync.h:136
int MQTTPacket_send_connect(Clients *client, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTPacketOut.c:48
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
Definition: Socket.c:242
MQTTSubscribe_options * subscribeOptionsList
Definition: MQTTAsync.h:743
int MQTTProtocol_handlePublishes(void *pack, int sock)
int Thread_lock_mutex(mutex_type mutex)
Definition: Thread.c:112
char ** serverURIs
Definition: MQTTAsync.c:408
char * payload
Definition: MQTTPacket.h:203
int MQTTPacket_send_disconnect(Clients *client, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTPacket.c:508
int SSLSocket_connect(SSL *ssl, int sock, const char *hostname, int verify, int(*cb)(const char *str, size_t len, void *u), void *u)
const char * client_version_eye
Definition: MQTTAsync.c:74
List pending_writes
Definition: MQTTProtocol.h:40
List * outboundMsgs
Definition: Clients.h:136
#define PERSISTENCE_V5_PUBLISH_SENT
void * context
Definition: Clients.h:145
struct pubsub_opts opts
Definition: paho_c_pub.c:42
int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void *context, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3133
static int MQTTAsync_unpersistInflightMessages(Clients *c)
Definition: MQTTAsync.c:1359
#define PERSISTENCE_V5_PUBLISH_RECEIVED
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
Definition: MQTTAsync.c:1723
static int MQTTAsync_checkConn(MQTTAsync_command *command, MQTTAsyncs *client)
Definition: MQTTAsync.c:563
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
int automaticReconnect
Definition: MQTTAsync.c:404
#define min(a, b)
Definition: MQTTAsync.c:88
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:583
void * updateConnectOptions_context
Definition: MQTTAsync.c:385
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
#define MQTTASYNC_DISCONNECTED
Definition: MQTTAsync.h:127
int MQTTAsync_setConnected(MQTTAsync handle, void *context, MQTTAsync_connected *connected)
Definition: MQTTAsync.c:3178
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4209
static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
Definition: MQTTAsync.c:1684
static l_noret error(LoadState *S, const char *why)
Definition: lundump.c:40
int topiclen
Definition: MQTTPacket.h:201
ClientStates * bstate
Definition: MQTTAsync.c:117
void MQTTAsync_connectionLost(void *context, char *cause)
Definition: MQTTAsync.h:397
MQTTAsync_connectionLost * cl
Definition: MQTTAsync.c:371
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:720
Header header
Definition: MQTTPacket.h:164
static void MQTTAsync_terminate(void)
Definition: MQTTAsync.c:747
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
int ListRemove(List *aList, void *content)
Definition: LinkedList.c:257
struct MQTTAsync_successData::@46::@48 connect
long elapsed(START_TIME_TYPE start_time)
Definition: test1.c:233
#define START_TIME_ZERO
Definition: MQTTTime.h:37
#define TRACE_MAX
Definition: Log.h:65
struct ListElementStruct * next
Definition: LinkedList.h:58
signed int connect_state
Definition: Clients.h:128
START_TIME_TYPE MQTTTime_start_clock(void)
Definition: MQTTTime.c:55
int ListDetach(List *aList, void *content)
Definition: LinkedList.c:245
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
static void MQTTProtocol_checkPendingWrites(void)
Definition: MQTTAsync.c:1648
void Log_setTraceCallback(Log_traceCallback *callback)
Definition: Log.c:218
MQTTProperties properties
Definition: MQTTPacket.h:189
#define PERSISTENCE_V5_COMMAND_KEY
const void * data
Definition: MQTTAsync.h:467
int MQTTProperties_getNumericValue(MQTTProperties *props, enum MQTTPropertyCodes propid)
START_TIME_TYPE lastTouch
Definition: Clients.h:61
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
void Protocol_processPublication(Publish *publish, Clients *client, int allocatePayload)
Definition: MQTTAsync.c:3396
union MQTTAsync_successData5::@49 alt
static mutex_type stack_mutex
Definition: StackTrace.c:73
int MQTTAsync_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: MQTTAsync.h:354
#define free(x)
Definition: Heap.h:55
int passwordlen
Definition: Clients.h:121
unsigned int type
Definition: MQTTPacket.h:80
#define PERSISTENCE_V5_QUEUE_KEY
networkHandles net
Definition: Clients.h:129
unsigned int connected
Definition: Clients.h:125
static MQTTPacket * MQTTAsync_cycle(int *sock, unsigned long timeout, int *rc)
Definition: MQTTAsync.c:4569
void * beforeWrite_context
Definition: Clients.h:143
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
static pthread_mutex_t socket_mutex_store
Definition: MQTTAsync.c:264
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
Definition: LinkedList.c:107
#define URI_WSS
Definition: MQTTAsync.c:69
void MQTTAsync_onSuccess5(void *context, MQTTAsync_successData5 *response)
Definition: MQTTAsync.h:647
void * maContext
Definition: MQTTAsync.c:375
#define MQTTASYNC_OPERATION_INCOMPLETE
Definition: MQTTAsync.h:163
struct MQTTAsync_command::@39::@44 conn
void Socket_setWriteCompleteCallback(Socket_writeComplete *mywritecomplete)
Definition: Socket.c:852
const char * message
Definition: MQTTAsync.h:518
static void nextOrClose(MQTTAsyncs *m, int rc, char *message)
Definition: MQTTAsync.c:2235
unsigned int good
Definition: Clients.h:126
int MQTTProtocol_handleUnsubacks(void *pack, int sock)
void Socket_clearPendingWrite(int socket)
Definition: Socket.c:587
void ListEmpty(List *aList)
Definition: LinkedList.c:359
MQTTProperties MQTTProperties_copy(const MQTTProperties *props)
int MQTTAsync_setAfterPersistenceRead(MQTTAsync handle, void *context, MQTTPersistence_afterRead *co)
Definition: MQTTAsync.c:3244
char * serverURI
Definition: MQTTAsync.c:365
void * connected_context
Definition: MQTTAsync.c:379
static List * handles
Definition: MQTTAsync.c:302
static int MQTTAsync_Socket_noPendingWrites(int socket)
Definition: MQTTAsync.c:1634
#define MAX_INFO_STRINGS
struct MQTTAsync_command::@39::@40 sub
List * responses
Definition: MQTTAsync.c:393
MQTTAsync_messageArrived * ma
Definition: MQTTAsync.c:372
constexpr size_t count()
Definition: core.h:960
static int cmdMessageIDCompare(void *a, void *b)
Definition: MQTTAsync.c:3947
void MQTTAsync_sleep(long milliseconds)
Definition: MQTTAsync.c:480
MQTTProperties * connectProps
Definition: MQTTAsync.c:418
#define PERSISTENCE_COMMAND_KEY
void * dcContext
Definition: MQTTAsync.c:376
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
int topicLen
Definition: MQTTAsync.c:310
List * clients
Definition: Clients.h:163
#define MQTTASYNC_BAD_MQTT_OPTION
Definition: MQTTAsync.h:181
MQTTAsync_updateConnectOptions * updateConnectOptions
Definition: MQTTAsync.c:384
int count
Definition: LinkedList.h:72
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
List * reasonCodes
Definition: MQTTPacket.h:190
int MQTTAsync_updateConnectOptions(void *context, MQTTAsync_connectData *data)
Definition: MQTTAsync.h:473
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1419
#define MQTTASYNC_BAD_QOS
Definition: MQTTAsync.h:155
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
int keepAliveInterval
Definition: Clients.h:131
bool dup
Definition: MQTTPacket.h:79
void MQTTProtocol_keepalive(START_TIME_TYPE now)
unsigned int seqno
Definition: MQTTAsync.c:311
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
Definition: MQTTAsync.c:4737
#define SSL_IN_PROGRESS
Definition: Clients.h:104
static void MQTTAsync_checkTimeouts(void)
Definition: MQTTAsync.c:2294
struct MQTTAsync_command::@39::@41 unsub
int MQTTAsync_setDisconnected(MQTTAsync handle, void *context, MQTTAsync_disconnected *disconnected)
Definition: MQTTAsync.c:3156
static mutex_type log_mutex
Definition: Log.c:128
#define max(A, B)
Definition: Socket.h:88
void * disconnected_context
Definition: MQTTAsync.c:382
int payloadlen
Definition: MQTTPacket.h:204
int ListRemoveItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:349
MQTTClient_persistence * persistence
Definition: Clients.h:140
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
void SSLSocket_handleOpensslInit(int bool_value)
int Thread_unlock_mutex(mutex_type mutex)
Definition: Thread.c:133
int payloadlen
Definition: Clients.h:72
static int MQTTAsync_processCommand(void)
Definition: MQTTAsync.c:1841
const char * MQTTPacket_name(int ptype)
Definition: MQTTPacket.c:65
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
enum MQTTAsync_threadStates receiveThread_state
Definition: MQTTAsync.c:127
int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char *const *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4131
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1327
#define MQTTAsync_connectData_initializer
Definition: MQTTAsync.h:471
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
#define URI_TCP
Definition: MQTTAsync.c:67
static thread_return_type WINAPI MQTTAsync_sendThread(void *n)
Definition: MQTTAsync.c:2393
ListElement * elem
Definition: MQTTAsync.c:1146
static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions *options, int internal)
Definition: MQTTAsync.c:3856
ROSLIB_DECL std::string command(const std::string &cmd)
#define MAX_MSG_ID
Definition: MQTTProtocol.h:25
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
int WebSocket_upgrade(networkHandles *net)
Definition: WebSocket.c:1306
static mutex_type mqttasync_mutex
Definition: MQTTAsync.c:262
struct MQTTAsync_willOptions::@54 payload
void * MQTTPacket_Factory(int MQTTVersion, networkHandles *net, int *error)
Definition: MQTTPacket.c:103
unsigned char rc
Definition: MQTTPacket.h:218
static List * commands
Definition: MQTTAsync.c:304
#define next(ls)
Definition: llex.c:32
struct MQTTAsync_connectData::@45 binarypwd
void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
unsigned int(* ssl_psk_cb)(const char *hint, char *identity, unsigned int max_identity_len, unsigned char *psk, unsigned int max_psk_len, void *u)
Definition: MQTTAsync.h:1113
int msgid
Definition: Clients.h:57
int qos
Definition: test6.c:56
int MQTTAsync_setBeforePersistenceWrite(MQTTAsync handle, void *context, MQTTPersistence_beforeWrite *co)
Definition: MQTTAsync.c:3222
MQTTProperties properties
Definition: MQTTPacket.h:206
MQTTAsync_token token
Definition: MQTTAsync.h:549
int MQTTAsync_isConnected(MQTTAsync handle)
Definition: MQTTAsync.c:3932
MQTTAsync_deliveryComplete * dc
Definition: MQTTAsync.c:373
static int MQTTAsync_connecting(MQTTAsyncs *m)
Definition: MQTTAsync.c:4377
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1393
void Socket_addPendingWrite(int socket)
Definition: Socket.c:577
static int clientStructCompare(void *a, void *b)
Definition: MQTTAsync.c:3315
void * payload
Definition: Clients.h:73
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char *const *topic, int *qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4004
void MQTTTime_sleep(ELAPSED_TIME_TYPE milliseconds)
Definition: MQTTTime.c:27
int MQTTProtocol_unsubscribe(Clients *client, List *topics, int msgID, MQTTProperties *props)
MQTTAsync_token token
Definition: MQTTAsync.h:530
#define START_TIME_TYPE
Definition: MQTTTime.h:36
#define mutex_type
Definition: mutex_type.h:22
MQTTAsync_token token
Definition: MQTTAsync.h:714
struct MQTTAsync_connectOptions::@55 binarypwd
int retained
Definition: Clients.h:74
#define MQTTASYNC_NO_MORE_MSGIDS
Definition: MQTTAsync.h:159
Definition: Log.h:41
MQTTAsync_token token
Definition: MQTTAsync.c:321
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.c:319
#define PERSISTENCE_SEQNO_LIMIT
static void MQTTAsync_closeOnly(Clients *client, enum MQTTReasonCodes reasonCode, MQTTProperties *props)
Definition: MQTTAsync.c:3266
int SSLSocket_getPendingRead(void)
MQTTAsync_SSLOptions * ssl
Definition: MQTTAsync.h:1243
static int clientSockCompare(void *a, void *b)
Definition: MQTTAsync.c:536
#define MQTTASYNC_MAX_BUFFERED_MESSAGES
Definition: MQTTAsync.h:167
MQTTAsync_disconnected * disconnected
Definition: MQTTAsync.c:381
List * qoss
Definition: MQTTPacket.h:177
int MQTTPersistence_restoreMessageQueue(Clients *c)
int MQTTAsync_token
Definition: MQTTAsync.h:249
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
#define WEBSOCKET_IN_PROGRESS
Definition: Clients.h:106
void MQTTProtocol_closeSession(Clients *c, int sendwill)
Definition: MQTTAsync.c:3917
#define MQTTASYNC_BAD_STRUCTURE
Definition: MQTTAsync.h:151
MQTTAsync_command disconnect
Definition: MQTTAsync.c:390
static cond_type send_cond
Definition: MQTTAsync.c:271
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.c:320
MQTTAsync_createOptions * createOptions
Definition: MQTTAsync.c:399
static int MQTTAsync_cleanSession(Clients *client)
Definition: MQTTAsync.c:3328
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
static void MQTTAsync_retry(void)
Definition: MQTTAsync.c:4358
const char * privateKeyPassword
Definition: MQTTAsync.h:1061
MQTTSubscribe_options * optlist
Definition: MQTTAsync.c:333
#define MQTTASYNC_WRONG_MQTT_VERSION
Definition: MQTTAsync.h:185
#define URI_WS
Definition: MQTTAsync.c:68
const char * username
Definition: MQTTAsync.h:461
void MQTTAsync_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode)
Definition: MQTTAsync.h:429
int maxInflightMessages
Definition: Clients.h:133
struct MQTTSubscribe_options MQTTSubscribe_options
const char * enabledCipherSuites
Definition: MQTTAsync.h:1071
MQTTProperties properties
Definition: MQTTAsync.h:1403
int MQTTPersistence_close(Clients *c)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
Definition: MQTTAsync.c:4803
MQTTAsync client
Definition: test6.c:276
static thread_id_type sendThread_id
Definition: MQTTAsync.c:128
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
int MQTTVersion
Definition: MQTTPacket.h:219
struct MQTTAsync_struct MQTTAsyncs
static void MQTTAsync_freeResponses(MQTTAsyncs *m)
Definition: MQTTAsync.c:2452
enum MQTTAsync_threadStates sendThread_state
Definition: MQTTAsync.c:126
int SSLSocket_close(networkHandles *net)
int MQTTProperties_hasProperty(MQTTProperties *props, enum MQTTPropertyCodes propid)
#define FUNC_ENTRY
Definition: StackTrace.h:55
unsigned char rc
Definition: MQTTPacket.h:153
unsigned int cleanstart
Definition: Clients.h:124
START_TIME_TYPE lastSent
Definition: Clients.h:81
#define MQTT_BAD_SUBSCRIBE
Definition: MQTTAsync.h:211
const char * message
Definition: MQTTAsync.h:538
unsigned int ping_outstanding
Definition: Clients.h:127
#define SECURE_MQTT_DEFAULT_PORT
static int retryLoopInterval
Definition: MQTTAsync.c:3465
MQTTProperties properties
Definition: MQTTAsync.h:730
void MQTTAsync_onSuccess(void *context, MQTTAsync_successData *response)
Definition: MQTTAsync.h:631
static void MQTTAsync_emptyMessageQueue(Clients *client)
Definition: MQTTAsync.c:2432
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
void * afterRead_context
Definition: Clients.h:144
void * clContext
Definition: MQTTAsync.c:374
const char * value
Definition: MQTTAsync.h:1150
char * MQTTStrdup(const char *src)
int Thread_wait_sem(sem_type sem, int timeout)
Definition: Thread.c:230
void MQTTAsync_connected(void *context, char *cause)
Definition: MQTTAsync.h:415
MQTTAsync_threadStates
Definition: MQTTAsync.c:121
static int MQTTAsync_completeConnection(MQTTAsyncs *m, Connack *connack)
Definition: MQTTAsync.c:2647
int SSLSocket_initialize(void)
void Heap_terminate(void)
Definition: Heap.c:417
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand *qcmd)
Definition: MQTTAsync.c:789
static int MQTTAsync_getNoBufferedMessages(MQTTAsyncs *m)
Definition: MQTTAsync.c:4219
int MQTTProtocol_handlePingresps(void *pack, int sock)
#define WINAPI
Definition: MQTTAsync.c:298
void MQTTAsync_global_init(MQTTAsync_init_options *inits)
Definition: MQTTAsync.c:79
int Heap_initialize(void)
Definition: Heap.c:406
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
#define PERSISTENCE_MAX_KEY_LENGTH
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
static pthread_mutex_t mqttasync_mutex_store
Definition: MQTTAsync.c:261
static void MQTTAsync_lock_mutex(mutex_type amutex)
Definition: MQTTAsync.c:543
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
int MQTTPersistence_unpersistQueueEntry(Clients *client, MQTTPersistence_qEntry *qe)
int messageIDCompare(void *a, void *b)
MQTTPacket * pack
Definition: MQTTAsync.c:396
int MQTTPersistence_create(MQTTClient_persistence **persistence, int type, void *pcontext)
const MQTTAsync_nameValue * httpHeaders
Definition: MQTTAsync.h:1331
MQTTSubscribe_options opts
Definition: MQTTAsync.c:332
pthread_cond_t cond
Definition: Thread.h:46
int retained
Definition: test6.c:57
dictionary context
Definition: test2.py:57
void MQTTProtocol_emptyMessageList(List *msgList)
#define NOT_IN_PROGRESS
Definition: Clients.h:100
pthread_mutex_t mutex
Definition: Thread.h:46
#define thread_return_type
Definition: Thread.h:44
const void * ptr(const T *p)
Definition: format.h:3610
const char * privateKey
Definition: MQTTAsync.h:1058
MQTTAsync_command command
Definition: MQTTAsync.c:426
thread_id_type Thread_getid(void)
Definition: Thread.c:176
const void * password
Definition: Clients.h:122
int Log_initialize(Log_nameValue *info)
Definition: Log.c:132
#define URI_SSL
Definition: MQTTClient.c:68
static thread_id_type receiveThread_id
Definition: MQTTAsync.c:129
struct MQTTAsync_successData::@46::@47 pub
MQTTAsyncs * client
Definition: MQTTAsync.c:427
union MQTTAsync_command::@39 details
char * topic
Definition: Clients.h:71
void * phandle
Definition: Clients.h:139
START_TIME_TYPE MQTTTime_now(void)
Definition: MQTTTime.c:66
sem_t * sem_type
Definition: Thread.h:53
#define MQTTASYNC_TRUE
Definition: MQTTAsync.h:1591
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:532
static int cmpkeys(const void *p1, const void *p2)
Definition: MQTTAsync.c:1220
int MQTTProtocol_handlePubrels(void *pack, int sock)
MQTTAsync_message * msg
Definition: MQTTAsync.c:308
static volatile int global_initialized
Definition: MQTTAsync.c:301
#define MQTTAsync_failureData5_initializer
Definition: MQTTAsync.h:543
int MQTTAsync_reconnect(MQTTAsync handle)
Definition: MQTTAsync.c:1545
#define MQTTASYNC_PERSISTENCE_ERROR
Definition: MQTTAsync.h:122
#define WS_DEFAULT_PORT
int ListRemoveHead(List *aList)
Definition: LinkedList.c:294
#define TCP_IN_PROGRESS
Definition: Clients.h:102
MQTTProperties * willProperties
Definition: MQTTAsync.h:1315
unsigned int seqno
Definition: MQTTAsync.c:428
static int tostop
Definition: MQTTAsync.c:303
int MQTTProtocol_subscribe(Clients *client, List *topics, List *qoss, int msgID, MQTTSubscribe_options *opts, MQTTProperties *props)
static ClientStates ClientState
Definition: MQTTAsync.c:111
const char * MQTTAsync_strerror(int code)
Definition: MQTTAsync.c:4944
static int MQTTAsync_unpersistCommandsAndMessages(Clients *c)
Definition: MQTTAsync.c:1323
static void MQTTAsync_writeComplete(int socket, int rc)
Definition: MQTTAsync.c:1730
static void setRetryLoopInterval(int keepalive)
Definition: MQTTAsync.c:3467
MQTTSubscribe_options subscribeOptions
Definition: MQTTAsync.h:734
int MQTTAsync_init(void)
Definition: MQTTAsync.c:273
static MQTTAsync_queuedCommand * MQTTAsync_restoreCommand(char *buffer, int buflen, int MQTTVersion, MQTTAsync_queuedCommand *)
Definition: MQTTAsync.c:961
void MQTTAsync_onFailure(void *context, MQTTAsync_failureData *response)
Definition: MQTTAsync.h:662
#define PERSISTENCE_QUEUE_KEY
static void MQTTAsync_insertInOrder(List *list, void *content, int size, struct keyloc *keyloc_array, int array_size)
Definition: MQTTAsync.c:1155
List * ListInitialize(void)
Definition: LinkedList.c:52
int Thread_signal_cond(cond_type condvar)
Definition: Thread.c:399
#define MQTTASYNC_BAD_PROTOCOL
Definition: MQTTAsync.h:177
#define ELAPSED_TIME_TYPE
Definition: MQTTTime.h:40
Header header
Definition: MQTTPacket.h:199
#define WAIT_FOR_CONNACK
Definition: Clients.h:108
int WebSocket_connect(networkHandles *net, const char *uri)
Definition: WebSocket.c:383
char * destinationName
Definition: MQTTAsync.c:342
void Log_traceCallback(enum LOG_LEVELS level, const char *message)
Definition: Log.h:81
int MQTTProtocol_handleSubacks(void *pack, int sock)
char * clientID
Definition: Clients.h:119
MQTTAsync_token token
Definition: MQTTAsync.h:514
thread_type Thread_start(thread_fn fn, void *parameter)
Definition: Thread.c:60
#define thread_id_type
Definition: Thread.h:43
size_t MQTTProtocol_addressPort(const char *uri, int *port, const char **topic, int default_port)
const void * data
Definition: MQTTAsync.h:1010
#define PERSISTENCE_PUBREL
void MQTTAsync_onFailure5(void *context, MQTTAsync_failureData5 *response)
Definition: MQTTAsync.h:677
void ListFreeNoContent(List *aList)
Definition: LinkedList.c:392
int MQTTProtocol_handlePubrecs(void *pack, int sock)
int noBufferedMessages
Definition: MQTTAsync.c:401
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.c:318
#define TCPSOCKET_COMPLETE
Definition: Socket.h:73
static int MQTTAsync_assignMsgId(MQTTAsyncs *m)
Definition: MQTTAsync.c:3960
int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void *context, MQTTAsync_messageArrived *ma)
Definition: MQTTAsync.c:3111
MQTTPersistence_afterRead * afterRead
Definition: Clients.h:142
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1413
int MQTTProtocol_startPublish(Clients *pubclient, Publish *publish, int qos, int retained, Messages **mm)
static void MQTTAsync_stop(void)
Definition: MQTTAsync.c:3017
int MQTTVersion
Definition: MQTTPacket.h:205
MQTTProperties properties
Definition: MQTTPacket.h:220
int MQTTAsync_randomJitter(int currentIntervalBase, int minInterval, int maxInterval)
Definition: MQTTAsync.c:496
static mutex_type socket_mutex
Definition: MQTTAsync.c:265
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
#define MQTTSubscribe_options_initializer
ListElement * first
Definition: LinkedList.h:69
#define PERSISTENCE_V5_PUBREL
int seqno
Definition: MQTTAsync.c:1145
static void MQTTAsync_freeCommands(MQTTAsyncs *m)
Definition: MQTTAsync.c:2500
List * inboundMsgs
Definition: Clients.h:135
char *const * serverURIs
Definition: MQTTAsync.h:1277
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:1407
void Socket_close(int socket)
Definition: Socket.c:627
int retryInterval
Definition: Clients.h:132
MQTTProperties properties
Definition: MQTTAsync.c:324
ListElement * ListFind(List *aList, void *content)
Definition: LinkedList.c:140
dictionary data
Definition: mqtt_test.py:22
int currentIntervalBase
Definition: MQTTAsync.c:412
static void MQTTAsync_startConnectRetry(MQTTAsyncs *m)
Definition: MQTTAsync.c:1526
static int MQTTAsync_restoreCommands(MQTTAsyncs *client)
Definition: MQTTAsync.c:1229
MQTTProtocol state
Definition: MQTTAsync.c:119
#define MQTTASYNC_TOPICNAME_TRUNCATED
Definition: MQTTAsync.h:146
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
Definition: MQTTAsync.c:3908
char * topics[]
static int MQTTAsync_addCommand(MQTTAsync_queuedCommand *command, int command_size)
Definition: MQTTAsync.c:1420
static int MQTTAsync_deliverMessage(MQTTAsyncs *m, char *topicName, size_t topicLen, MQTTAsync_message *mm)
Definition: MQTTAsync.c:3381
#define MQTTVERSION_3_1
Definition: MQTTAsync.h:199
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
const char * CApath
Definition: MQTTAsync.h:1094
#define PERSISTENCE_PUBLISH_RECEIVED
struct MQTTAsync_command::@39::@42 pub
int MQTTPersistence_afterRead(void *context, char **buffer, int *buflen)
int MQTTPersistence_persistQueueEntry(Clients *aclient, MQTTPersistence_qEntry *qe)
void Socket_outInitialize(void)
Definition: Socket.c:122
int MQTTAsync_setUpdateConnectOptions(MQTTAsync handle, void *context, MQTTAsync_updateConnectOptions *updateOptions)
Definition: MQTTAsync.c:3200
struct MQTTAsync_command::@39::@43 dis
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
ELAPSED_TIME_TYPE MQTTTime_elapsed(START_TIME_TYPE milliseconds)
Definition: MQTTTime.c:109
const char * username
Definition: Clients.h:120
struct MQTTAsync_successData5::@49::@51 pub
MQTTAsync_command connect
Definition: MQTTAsync.c:389
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
MQTTAsync_connected * connected
Definition: MQTTAsync.c:378
MQTTProperties * willProps
Definition: MQTTAsync.c:419
void MQTTAsync_deliveryComplete(void *context, MQTTAsync_token token)
Definition: MQTTAsync.h:377
struct MQTTAsync_successData5::@49::@50 sub
char * http_proxy
Definition: Clients.h:90
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1321
unsigned int qos
Definition: MQTTPacket.h:78
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
#define MQTTASYNC_MAX_MESSAGES_INFLIGHT
Definition: MQTTAsync.h:132
static void MQTTAsync_closeSession(Clients *client, enum MQTTReasonCodes reasonCode, MQTTProperties *props)
Definition: MQTTAsync.c:3296
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
#define MQTTASYNC_0_LEN_WILL_TOPIC
Definition: MQTTAsync.h:189
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.c:317
unsigned int cleansession
Definition: Clients.h:123
static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand *qcmd)
Definition: MQTTAsync.c:772
MQTTAsync_command * pending_write
Definition: MQTTAsync.c:391
void Log_terminate(void)
Definition: Log.c:232
char struct_id[4]
Definition: MQTTAsync.h:260
int len
Definition: utf-8.c:46
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
Definition: MQTTAsync.c:4848
int MQTTPersistence_beforeWrite(void *context, int bufcount, char *buffers[], int buflens[])
int(* ssl_error_cb)(const char *str, size_t len, void *u)
Definition: MQTTAsync.h:1100
#define SSL_FATAL
Definition: Socket.h:80
Clients * c
Definition: MQTTAsync.c:368
MQTTProperties * connectProperties
Definition: MQTTAsync.h:1311
struct Options options
willMessages * will
Definition: Clients.h:134
#define MQTTProperties_initializer
const unsigned char * protos
Definition: MQTTAsync.h:1135


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09