test4.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - MQTT 3.1.1 support
16  * Ian Craggs - test8 - failure callbacks
17  *******************************************************************************/
18 
19 
26 #include "MQTTAsync.h"
27 #include <string.h>
28 #include <stdlib.h>
29 
30 #if !defined(_WINDOWS)
31  #include <sys/time.h>
32  #include <sys/socket.h>
33  #include <unistd.h>
34  #include <errno.h>
35 #else
36  #include <windows.h>
37 #endif
38 
39 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
40 
41 void usage(void)
42 {
43  printf("help!!\n");
44  exit(EXIT_FAILURE);
45 }
46 
47 struct Options
48 {
49  char* connection;
50  int verbose;
51  int test_no;
52  int size;
53  int MQTTVersion;
54  int iterations;
55 } options =
56 {
57  "mqtt.eclipse.org:1883",
58  0,
59  -1,
60  10000,
62  1,
63 };
64 
65 void getopts(int argc, char** argv)
66 {
67  int count = 1;
68 
69  while (count < argc)
70  {
71  if (strcmp(argv[count], "--test_no") == 0)
72  {
73  if (++count < argc)
74  options.test_no = atoi(argv[count]);
75  else
76  usage();
77  }
78  else if (strcmp(argv[count], "--size") == 0)
79  {
80  if (++count < argc)
81  options.size = atoi(argv[count]);
82  else
83  usage();
84  }
85  else if (strcmp(argv[count], "--connection") == 0)
86  {
87  if (++count < argc)
88  options.connection = argv[count];
89  else
90  usage();
91  }
92  else if (strcmp(argv[count], "--MQTTversion") == 0)
93  {
94  if (++count < argc)
95  {
96  options.MQTTVersion = atoi(argv[count]);
97  printf("setting MQTT version to %d\n", options.MQTTVersion);
98  }
99  else
100  usage();
101  }
102  else if (strcmp(argv[count], "--iterations") == 0)
103  {
104  if (++count < argc)
105  options.iterations = atoi(argv[count]);
106  else
107  usage();
108  }
109  else if (strcmp(argv[count], "--verbose") == 0)
110  options.verbose = 1;
111  count++;
112  }
113 }
114 
115 #define LOGA_DEBUG 0
116 #define LOGA_INFO 1
117 #include <stdarg.h>
118 #include <time.h>
119 #include <sys/timeb.h>
120 void MyLog(int LOGA_level, char* format, ...)
121 {
122  static char msg_buf[256];
123  va_list args;
124 #if defined(_WIN32) || defined(_WINDOWS)
125  struct timeb ts;
126 #else
127  struct timeval ts;
128 #endif
129  struct tm timeinfo;
130 
131  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
132  return;
133 
134 #if defined(_WIN32) || defined(_WINDOWS)
135  ftime(&ts);
136  localtime_s(&timeinfo, &ts.time);
137 #else
138  gettimeofday(&ts, NULL);
139  localtime_r(&ts.tv_sec, &timeinfo);
140 #endif
141  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
142 
143 #if defined(_WIN32) || defined(_WINDOWS)
144  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
145 #else
146  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
147 #endif
148 
149  va_start(args, format);
150  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
151  va_end(args);
152 
153  printf("%s\n", msg_buf);
154  fflush(stdout);
155 }
156 
157 
158 #if defined(_WIN32) || defined(_WINDOWS)
159 #define mqsleep(A) Sleep(1000*A)
160 #define START_TIME_TYPE DWORD
161 static DWORD start_time = 0;
163 {
164  return GetTickCount();
165 }
166 #elif defined(AIX)
167 #define mqsleep sleep
168 #define START_TIME_TYPE struct timespec
170 {
171  static struct timespec start;
172  clock_gettime(CLOCK_REALTIME, &start);
173  return start;
174 }
175 #else
176 #define mqsleep sleep
177 #define START_TIME_TYPE struct timeval
178 /* TODO - unused - remove? static struct timeval start_time; */
180 {
181  struct timeval start_time;
182  gettimeofday(&start_time, NULL);
183  return start_time;
184 }
185 #endif
186 
187 
188 #if defined(_WIN32)
189 long elapsed(START_TIME_TYPE start_time)
190 {
191  return GetTickCount() - start_time;
192 }
193 #elif defined(AIX)
194 #define assert(a)
195 long elapsed(struct timespec start)
196 {
197  struct timespec now, res;
198 
199  clock_gettime(CLOCK_REALTIME, &now);
200  ntimersub(now, start, res);
201  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
202 }
203 #else
204 long elapsed(START_TIME_TYPE start_time)
205 {
206  struct timeval now, res;
207 
208  gettimeofday(&now, NULL);
209  timersub(&now, &start_time, &res);
210  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
211 }
212 #endif
213 
214 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
215 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
216 
217 int tests = 0;
218 int failures = 0;
219 FILE* xml;
221 char output[3000];
223 
225 {
226  long duration = elapsed(global_start_time);
227 
228  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
229  if (cur_output != output)
230  {
231  fprintf(xml, "%s", output);
232  cur_output = output;
233  }
234  fprintf(xml, "</testcase>\n");
235 }
236 
237 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
238 {
239  ++tests;
240  if (!value)
241  {
242  va_list args;
243 
244  ++failures;
245  printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
246 
247  va_start(args, format);
248  vprintf(format, args);
249  va_end(args);
250 
251  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
252  description, filename, lineno);
253  }
254  else
255  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
256 }
257 
258 volatile int test_finished = 0;
259 
260 char* test_topic = "async test topic";
261 
262 
264 {
265  MQTTAsync c = (MQTTAsync)context;
266  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
267  test_finished = 1;
268 }
269 
270 
272 {
273  MQTTAsync c = (MQTTAsync)context;
275  int rc;
276 
277  MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
279  opts.context = c;
280 
281  rc = MQTTAsync_disconnect(c, &opts);
282  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
283 }
284 
285 
286 int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
287 {
288  MQTTAsync c = (MQTTAsync)context;
289  static int message_count = 0;
290  int rc;
291 
292  MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
293 
294  if (++message_count == 1)
295  {
298  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
299  pubmsg.payloadlen = 11;
300  pubmsg.qos = 2;
301  pubmsg.retained = 0;
302  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
303  assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
304  }
305  else
306  {
308 
310  opts.context = c;
311  rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
312  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
313  }
314 
315  MQTTAsync_freeMessage(&message);
316  MQTTAsync_free(topicName);
317 
318  return 1;
319 }
320 
322 {
323  MQTTAsync c = (MQTTAsync)context;
325  int rc;
326 
327  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
328 
329  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
330  pubmsg.payloadlen = 11;
331  pubmsg.qos = 2;
332  pubmsg.retained = 0;
333 
334  rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
335  assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
336 }
337 
338 
340 {
341  MQTTAsync c = (MQTTAsync)context;
343  int rc;
344 
345  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
347  opts.context = c;
348 
349  rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
350  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
351  if (rc != MQTTASYNC_SUCCESS)
352  test_finished = 1;
353 }
354 
355 
356 /*********************************************************************
357 
358 Test1: Basic connect, subscribe send and receive.
359 
360 *********************************************************************/
361 int test1(struct Options options)
362 {
363  int subsqos = 2;
364  MQTTAsync c;
367  int rc = 0;
368  char* test_topic = "C client test1";
369 
370  MyLog(LOGA_INFO, "Starting test 1 - asynchronous connect");
371  fprintf(xml, "<testcase classname=\"test4\" name=\"asynchronous connect\"");
373 
374  rc = MQTTAsync_create(&c, options.connection, "async_test",
376  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
377  if (rc != MQTTASYNC_SUCCESS)
378  {
379  MQTTAsync_destroy(&c);
380  goto exit;
381  }
382 
383  rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
384  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
385 
386  opts.keepAliveInterval = 20;
387  opts.cleansession = 1;
388  opts.username = "testuser";
389  opts.password = "testpassword";
390  opts.MQTTVersion = options.MQTTVersion;
391 
392  opts.will = &wopts;
393  opts.will->message = "will message";
394  opts.will->qos = 1;
395  opts.will->retained = 0;
396  opts.will->topicName = "will topic";
397  opts.will = NULL;
398  opts.onSuccess = test1_onConnect;
399  opts.onFailure = NULL;
400  opts.context = c;
401 
402  MyLog(LOGA_DEBUG, "Connecting");
403  rc = MQTTAsync_connect(c, &opts);
404  rc = 0;
405  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
406  if (rc != MQTTASYNC_SUCCESS)
407  goto exit;
408 
409  while (!test_finished)
410  #if defined(_WIN32)
411  Sleep(100);
412  #else
413  usleep(10000L);
414  #endif
415 
416  MQTTAsync_destroy(&c);
417 
418 exit:
419  MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
420  (failures == 0) ? "passed" : "failed", tests, failures);
422  return failures;
423 }
424 
426 
428 {
429  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
430 
432  test_finished = 1;
433 }
434 
435 
437 {
438 
439  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
440 
441  assert("Connect should not succeed", 0, "connect success callback was called", 0);
442 
443  test_finished = 1;
444 }
445 
446 /*********************************************************************
447 
448 Test2: connect timeout
449 
450 *********************************************************************/
451 int test2(struct Options options)
452 {
453  int subsqos = 2;
454  MQTTAsync c;
457  int rc = 0;
458  char* test_topic = "C client test2";
459 
460  test_finished = 0;
461 
462  MyLog(LOGA_INFO, "Starting test 2 - connect timeout");
463  fprintf(xml, "<testcase classname=\"test4\" name=\"connect timeout\"");
465 
466  rc = MQTTAsync_create(&c, "tcp://9.20.96.160:66", "connect timeout",
468  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
469  if (rc != MQTTASYNC_SUCCESS)
470  {
471  MQTTAsync_destroy(&c);
472  goto exit;
473  }
474 
475  rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
476  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
477 
478  opts.connectTimeout = 5;
479  opts.keepAliveInterval = 20;
480  opts.cleansession = 1;
481  opts.username = "testuser";
482  opts.binarypwd.data = "testpassword";
483  opts.binarypwd.len = (int)strlen(opts.binarypwd.data);
484  opts.MQTTVersion = options.MQTTVersion;
485 
486  opts.will = &wopts;
487  opts.will->message = "will message";
488  opts.will->qos = 1;
489  opts.will->retained = 0;
490  opts.will->topicName = "will topic";
491  opts.will = NULL;
492  opts.onSuccess = test2_onConnect;
493  opts.onFailure = test2_onFailure;
494  opts.context = c;
495 
496  MyLog(LOGA_DEBUG, "Connecting");
497  rc = MQTTAsync_connect(c, &opts);
498  rc = 0;
499  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
500  if (rc != MQTTASYNC_SUCCESS)
501  goto exit;
502 
503  while (!test_finished)
504  #if defined(_WIN32)
505  Sleep(100);
506  #else
507  usleep(10000L);
508  #endif
509 
510  MQTTAsync_destroy(&c);
511 
512 exit:
513  assert("Connect onFailure should be called once", test2_onFailure_called == 1,
514  "connect onFailure was called %d times", test2_onFailure_called);
515 
516  MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
517  (failures == 0) ? "passed" : "failed", tests, failures);
519  return failures;
520 }
521 
522 
523 typedef struct
524 {
526  int index;
527  char clientid[24];
528  char test_topic[100];
530 } client_data;
531 
532 
534 {
535  client_data* cd = (client_data*)context;
536  MyLog(LOGA_DEBUG, "In onDisconnect callback for client \"%s\"", cd->clientid);
537  test_finished++;
538 }
539 
540 
542 {
543  client_data* cd = (client_data*)context;
544  MyLog(LOGA_DEBUG, "In QoS 0 onPublish callback for client \"%s\"", cd->clientid);
545 }
546 
547 
549 {
550  client_data* cd = (client_data*)context;
552  int rc;
553 
554  MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback \"%s\"", cd->clientid);
556  opts.context = cd;
557 
558  rc = MQTTAsync_disconnect(cd->c, &opts);
559  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
560 }
561 
562 
563 int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
564 {
565  client_data* cd = (client_data*)context;
566  int rc;
567 
568  MyLog(LOGA_DEBUG, "In messageArrived callback \"%s\" message count ", cd->clientid);
569 
570  if (++cd->message_count == 1)
571  {
574  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
575  pubmsg.payloadlen = 25;
576  pubmsg.qos = 1;
577  pubmsg.retained = 0;
578  rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
579  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
580  }
581  else if (cd->message_count == 2)
582  {
585  pubmsg.payload = "a QoS 0 message that we can shorten to the extent that we need to payload up to 11";
586  pubmsg.payloadlen = 29;
587  pubmsg.qos = 0;
588  pubmsg.retained = 0;
589  opts.context = cd;
590  opts.onSuccess = test3_onPublish;
591 
592  rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
593  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
594  }
595  else
596  {
598 
600  opts.context = cd;
601  rc = MQTTAsync_unsubscribe(cd->c, cd->test_topic, &opts);
602  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
603  }
604  MQTTAsync_freeMessage(&message);
605  MQTTAsync_free(topicName);
606  return 1;
607 }
608 
610 {
611  client_data* cd = (client_data*)context;
613  int rc;
614 
615  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback \"%s\"", cd->clientid);
616 
617  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
618  pubmsg.payloadlen = 11;
619  pubmsg.qos = 2;
620  pubmsg.retained = 0;
621 
622  rc = MQTTAsync_send(cd->c, cd->test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
623  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
624 }
625 
626 
628 {
629  client_data* cd = (client_data*)context;
631  int rc;
632 
633  MyLog(LOGA_DEBUG, "In connect onSuccess callback, \"%s\"", cd->clientid);
635  opts.context = cd;
636 
637  rc = MQTTAsync_subscribe(cd->c, cd->test_topic, 2, &opts);
638  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
639  if (rc != MQTTASYNC_SUCCESS)
640  test_finished++;
641 }
642 
643 
645 {
646  client_data* cd = (client_data*)context;
648 
649  assert("Should have connected", 0, "%s failed to connect\n", cd->clientid);
650  MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response ? response->code : -999);
651  if (response && response->message)
652  MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\"\n", response->message);
653 
654  test_finished++;
655 }
656 
657 
658 /*********************************************************************
659 
660 Test3: More than one client object - simultaneous working.
661 
662 *********************************************************************/
663 int test3(struct Options options)
664 {
665  #define num_clients 10
666  int subsqos = 2;
669  int rc = 0;
670  int i;
671  client_data clientdata[num_clients];
672 
673  test_finished = 0;
674  MyLog(LOGA_INFO, "Starting test 3 - multiple connections");
675  fprintf(xml, "<testcase classname=\"test4\" name=\"multiple connections\"");
677 
678  for (i = 0; i < num_clients; ++i)
679  {
680  sprintf(clientdata[i].clientid, "async_test3_num_%d", i);
681  sprintf(clientdata[i].test_topic, "async test3 topic num %d", i);
682  clientdata[i].index = i;
683  clientdata[i].message_count = 0;
684 
685  rc = MQTTAsync_create(&(clientdata[i].c), options.connection, clientdata[i].clientid,
687  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
688 
689  rc = MQTTAsync_setCallbacks(clientdata[i].c, &clientdata[i], NULL, test3_messageArrived, NULL);
690  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
691 
692  opts.keepAliveInterval = 20;
693  opts.cleansession = 1;
694  opts.username = "testuser";
695  opts.password = "testpassword";
696  opts.MQTTVersion = options.MQTTVersion;
697 
698  opts.will = &wopts;
699  opts.will->message = "will message";
700  opts.will->qos = 1;
701  opts.will->retained = 0;
702  opts.will->topicName = "will topic";
703  opts.onSuccess = test3_onConnect;
704  opts.onFailure = test3_onFailure;
705  opts.context = &clientdata[i];
706 
707  MyLog(LOGA_DEBUG, "Connecting");
708  rc = MQTTAsync_connect(clientdata[i].c, &opts);
709  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
710  }
711 
712  while (test_finished < num_clients)
713  {
714  MyLog(LOGA_DEBUG, "num_clients %d test_finished %d\n", num_clients, test_finished);
715  #if defined(_WIN32)
716  Sleep(100);
717  #else
718  usleep(10000L);
719  #endif
720  }
721 
722  MyLog(LOGA_DEBUG, "TEST3: destroying clients");
723 
724  for (i = 0; i < num_clients; ++i)
725  MQTTAsync_destroy(&clientdata[i].c);
726 
727 //exit:
728  MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
729  (failures == 0) ? "passed" : "failed", tests, failures);
731  return failures;
732 }
733 
734 
735 void* test4_payload = NULL;
737 
739 {
740  MQTTAsync c = (MQTTAsync)context;
741 
742  MyLog(LOGA_DEBUG, "In publish onSuccess callback, context %p", context);
743 }
744 
745 int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
746 {
747  MQTTAsync c = (MQTTAsync)context;
748  static int message_count = 0;
749  int rc, i;
750 
751  MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
752 
753  assert("Message size correct", message->payloadlen == test4_payloadlen,
754  "message size was %d", message->payloadlen);
755 
756  for (i = 0; i < options.size; ++i)
757  {
758  if (((char*)test4_payload)[i] != ((char*)message->payload)[i])
759  {
760  assert("Message contents correct", ((char*)test4_payload)[i] != ((char*)message->payload)[i],
761  "message content was %c", ((char*)message->payload)[i]);
762  break;
763  }
764  }
765 
766  if (++message_count == 1)
767  {
770 
771  pubmsg.payload = test4_payload;
772  pubmsg.payloadlen = test4_payloadlen;
773  pubmsg.qos = 1;
774  pubmsg.retained = 0;
775  opts.onSuccess = test4_onPublish;
776  opts.context = c;
777 
778  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
779  }
780  else if (message_count == 2)
781  {
784 
785  pubmsg.payload = test4_payload;
786  pubmsg.payloadlen = test4_payloadlen;
787  pubmsg.qos = 0;
788  pubmsg.retained = 0;
789  opts.onSuccess = test4_onPublish;
790  opts.context = c;
791  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
792  }
793  else
794  {
796 
798  opts.context = c;
799  rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
800  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
801  }
802 
803  MQTTAsync_freeMessage(&message);
804  MQTTAsync_free(topicName);
805 
806  return 1;
807 }
808 
809 
811 {
812  MQTTAsync c = (MQTTAsync)context;
814  int rc, i;
815 
816  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", c);
817 
820 
821  srand(33);
822  for (i = 0; i < options.size; ++i)
823  ((char*)pubmsg.payload)[i] = rand() % 256;
824 
825  pubmsg.qos = 2;
826  pubmsg.retained = 0;
827 
828  rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
829 }
830 
831 
833 {
834  MQTTAsync c = (MQTTAsync)context;
836  int rc;
837 
838  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
840  opts.context = c;
841 
842  rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
843  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
844  if (rc != MQTTASYNC_SUCCESS)
845  test_finished = 1;
846 }
847 
848 
849 /*********************************************************************
850 
851 Test4: Send and receive big messages
852 
853 *********************************************************************/
854 int test4(struct Options options)
855 {
856  int subsqos = 2;
857  MQTTAsync c;
860  int rc = 0;
861  char* test_topic = "C client test4";
862 
863  test_finished = failures = 0;
864  MyLog(LOGA_INFO, "Starting test 4 - big messages");
865  fprintf(xml, "<testcase classname=\"test4\" name=\"big messages\"");
867 
868  rc = MQTTAsync_create(&c, options.connection, "async_test_4",
870  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
871  if (rc != MQTTASYNC_SUCCESS)
872  {
873  MQTTAsync_destroy(&c);
874  goto exit;
875  }
876 
877  rc = MQTTAsync_setCallbacks(c, c, NULL, test4_messageArrived, NULL);
878  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
879 
880  opts.keepAliveInterval = 20;
881  opts.cleansession = 1;
882  opts.username = "testuser";
883  opts.password = "testpassword";
884  opts.MQTTVersion = options.MQTTVersion;
885 
886  opts.will = &wopts;
887  opts.will->message = "will message";
888  opts.will->qos = 1;
889  opts.will->retained = 0;
890  opts.will->topicName = "will topic";
891  opts.will = NULL;
892  opts.onSuccess = test4_onConnect;
893  opts.onFailure = NULL;
894  opts.context = c;
895 
896  MyLog(LOGA_DEBUG, "Connecting");
897  rc = MQTTAsync_connect(c, &opts);
898  rc = 0;
899  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
900  if (rc != MQTTASYNC_SUCCESS)
901  goto exit;
902 
903  while (!test_finished)
904  #if defined(_WIN32)
905  Sleep(100);
906  #else
907  usleep(1000L);
908  #endif
909 
910  MQTTAsync_destroy(&c);
911 
912 exit:
913  MyLog(LOGA_INFO, "TEST4: test %s. %d tests run, %d failures.",
914  (failures == 0) ? "passed" : "failed", tests, failures);
916  return failures;
917 }
918 
919 
921 {
922  MQTTAsync c = (MQTTAsync)context;
924 
925  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
926 
927  MyLog(LOGA_INFO, "Connack rc is %d", response ? response->code : -999);
928 
929  test_finished = 1;
930 }
931 
932 
934 {
935  MQTTAsync c = (MQTTAsync)context;
937 
938  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
939 
940  test_finished = 1;
941 }
942 
943 
944 /********************************************************************
945 
946 Test5: Connack return codes
947 
948 *********************************************************************/
949 int test5(struct Options options)
950 {
951  int subsqos = 2;
952  MQTTAsync c;
955  int rc = 0;
956  char* test_topic = "C client test1";
957 
958  test_finished = failures = 0;
959  MyLog(LOGA_INFO, "Starting test 5 - connack return codes");
960  fprintf(xml, "<testcase classname=\"test4\" name=\"connack return codes\"");
962 
963  rc = MQTTAsync_create(&c, options.connection, "a clientid that is too long to be accepted",
965  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
966  if (rc != MQTTASYNC_SUCCESS)
967  {
968  MQTTAsync_destroy(&c);
969  goto exit;
970  }
971 
972  rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
973  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
974 
975  opts.onSuccess = test5_onConnect;
977  opts.context = c;
978 
979  MyLog(LOGA_DEBUG, "Connecting");
980  rc = MQTTAsync_connect(c, &opts);
981  rc = 0;
982  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
983  if (rc != MQTTASYNC_SUCCESS)
984  goto exit;
985 
986  while (!test_finished)
987  #if defined(_WIN32)
988  Sleep(100);
989  #else
990  usleep(10000L);
991  #endif
992 
993  MQTTAsync_destroy(&c);
994 
995 exit:
996  MyLog(LOGA_INFO, "TEST5: test %s. %d tests run, %d failures.",
997  (failures == 0) ? "passed" : "failed", tests, failures);
999  return failures;
1000 }
1001 
1002 
1003 typedef struct
1004 {
1008 
1010 {
1011  test6_client_info cinfo = *(test6_client_info*)context;
1012 
1013  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1014 
1015  if (response)
1016  MyLog(LOGA_INFO, "Connack rc is %d", response->code);
1017 
1018  assert("Should fail to connect", cinfo.should_fail, "should_fail was %d\n", cinfo.should_fail);
1019 
1020  test_finished = 1;
1021 }
1022 
1023 
1025 {
1026  test6_client_info cinfo = *(test6_client_info*)context;
1027 
1028  MyLog(LOGA_DEBUG, "In connect success callback, context %p", context);
1029 
1030  assert("Should connect correctly", !cinfo.should_fail, "should_fail was %d\n", cinfo.should_fail);
1031 
1032  test_finished = 1;
1033 }
1034 
1035 
1037 {
1038  test6_client_info cinfo = *(test6_client_info*)context;
1039 
1040  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", cinfo.c);
1041  test_finished = 1;
1042 }
1043 
1044 
1045 /********************************************************************
1046 
1047 Test6: HA connections
1048 
1049 *********************************************************************/
1050 int test6(struct Options options)
1051 {
1052  int subsqos = 2;
1053  test6_client_info cinfo;
1057  int rc = 0;
1058  char* test_topic = "C client test1";
1059  char* uris[2] = {options.connection, options.connection};
1060 
1061  failures = 0;
1062  MyLog(LOGA_INFO, "Starting test 6 - HA connections");
1063  fprintf(xml, "<testcase classname=\"test4\" name=\"HA connections\"");
1065 
1066  test_finished = 0;
1067  cinfo.should_fail = 1; /* fail to connect */
1068  rc = MQTTAsync_create(&cinfo.c, "tcp://rubbish:1883", "async ha connection",
1070  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1071  if (rc != MQTTASYNC_SUCCESS)
1072  {
1073  MQTTAsync_destroy(&cinfo.c);
1074  goto exit;
1075  }
1076 
1077  rc = MQTTAsync_setCallbacks(cinfo.c, cinfo.c, NULL, test1_messageArrived, NULL);
1078  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1079 
1080  opts.onSuccess = test6_onConnect;
1082  opts.context = &cinfo;
1083  opts.MQTTVersion = options.MQTTVersion;
1084 
1085  MyLog(LOGA_DEBUG, "Connecting");
1086  rc = MQTTAsync_connect(cinfo.c, &opts);
1087  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1088  if (rc != MQTTASYNC_SUCCESS)
1089  goto exit;
1090 
1091  while (!test_finished)
1092  #if defined(_WIN32)
1093  Sleep(100);
1094  #else
1095  usleep(10000L);
1096  #endif
1097 
1098  test_finished = 0;
1099  cinfo.should_fail = 0; /* should connect through the serverURIs in connect options*/
1100  opts.onSuccess = test6_onConnect;
1102  opts.context = &cinfo;
1103  opts.serverURIs = uris;
1104  opts.serverURIcount = 2;
1105 
1106  MyLog(LOGA_DEBUG, "Connecting");
1107  rc = MQTTAsync_connect(cinfo.c, &opts);
1108  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1109  if (rc != MQTTASYNC_SUCCESS)
1110  goto exit;
1111 
1112  while (!test_finished)
1113  #if defined(_WIN32)
1114  Sleep(100);
1115  #else
1116  usleep(10000L);
1117  #endif
1118 
1119  test_finished = 0;
1120  dopts.timeout = 0;
1121  dopts.onSuccess = test6_onDisconnect;
1122  dopts.context = &cinfo;
1123  MQTTAsync_disconnect(cinfo.c, &dopts);
1124 
1125  while (!test_finished)
1126  #if defined(_WIN32)
1127  Sleep(100);
1128  #else
1129  usleep(10000L);
1130  #endif
1131 
1132  MQTTAsync_destroy(&cinfo.c);
1133 
1134 exit:
1135  MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
1136  (failures == 0) ? "passed" : "failed", tests, failures);
1138  return failures;
1139 }
1140 
1141 
1142 
1143 /********************************************************************
1144 
1145 Test7: Persistence
1146 
1147 *********************************************************************/
1148 
1149 char* test7_topic = "C client test7";
1151 
1153 {
1154  MQTTAsync c = (MQTTAsync)context;
1155  MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
1156 
1157  assert("Successful disconnect", 0, "disconnect failed", 0);
1158 
1159  test_finished = 1;
1160 }
1161 
1163 {
1164  MQTTAsync c = (MQTTAsync)context;
1165  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
1166  test_finished = 1;
1167 }
1168 
1169 
1171 {
1172  MQTTAsync c = (MQTTAsync)context;
1174  int rc;
1175 
1176  MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
1178  opts.context = c;
1179 
1180  rc = MQTTAsync_disconnect(c, &opts);
1181  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1182 }
1183 
1184 
1185 int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
1186 {
1187  MQTTAsync c = (MQTTAsync)context;
1188  static int message_count = 0;
1189 
1190  MyLog(LOGA_DEBUG, "Test7: received message id %d", message->msgid);
1191 
1193 
1194  MQTTAsync_freeMessage(&message);
1195  MQTTAsync_free(topicName);
1196 
1197  return 1;
1198 }
1199 
1200 
1201 static int test7_subscribed = 0;
1202 
1204 {
1205  MQTTAsync c = (MQTTAsync)context;
1206 
1207  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
1208 
1209  test7_subscribed = 1;
1210 }
1211 
1212 
1214 {
1215  MQTTAsync c = (MQTTAsync)context;
1217  int rc;
1218 
1219  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
1221  opts.context = c;
1222 
1223  rc = MQTTAsync_subscribe(c, test7_topic, 2, &opts);
1224  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1225  if (rc != MQTTASYNC_SUCCESS)
1226  test_finished = 1;
1227 }
1228 
1229 
1231 {
1232  MQTTAsync c = (MQTTAsync)context;
1234  int rc;
1235 
1236  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
1237  dopts.context = context;
1238  dopts.timeout = 1000;
1239  dopts.onSuccess = test7_onDisconnect;
1240  rc = MQTTAsync_disconnect(c, &dopts);
1241 
1242  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1243  if (rc != MQTTASYNC_SUCCESS)
1244  test_finished = 1;
1245 }
1246 
1247 
1248 /*********************************************************************
1249 
1250 Test7: Pending tokens
1251 
1252 *********************************************************************/
1253 int test7(struct Options options)
1254 {
1255  int subsqos = 2;
1256  MQTTAsync c;
1259  int rc = 0;
1263  MQTTAsync_token* tokens = NULL;
1264  int msg_count = 6;
1265 
1266  MyLog(LOGA_INFO, "Starting test 7 - pending tokens");
1267  fprintf(xml, "<testcase classname=\"test4\" name=\"pending tokens\"");
1269  test_finished = 0;
1270 
1271  rc = MQTTAsync_create(&c, options.connection, "async_test7",
1273  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1274  if (rc != MQTTASYNC_SUCCESS)
1275  {
1276  MQTTAsync_destroy(&c);
1277  goto exit;
1278  }
1279 
1280  rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
1281  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1282 
1283  opts.keepAliveInterval = 20;
1284  opts.username = "testuser";
1285  opts.password = "testpassword";
1286  opts.MQTTVersion = options.MQTTVersion;
1287 
1288  opts.will = &wopts;
1289  opts.will->message = "will message";
1290  opts.will->qos = 1;
1291  opts.will->retained = 0;
1292  opts.will->topicName = "will topic";
1293  opts.will = NULL;
1294 
1295  opts.onFailure = NULL;
1296  opts.context = c;
1297 
1298  opts.cleansession = 1;
1300  MyLog(LOGA_DEBUG, "Connecting to clean up");
1301  rc = MQTTAsync_connect(c, &opts);
1302  rc = 0;
1303  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1304  if (rc != MQTTASYNC_SUCCESS)
1305  goto exit;
1306 
1307  while (!test_finished)
1308  #if defined(_WIN32)
1309  Sleep(100);
1310  #else
1311  usleep(10000L);
1312  #endif
1313 
1314  test_finished = 0;
1315  MyLog(LOGA_DEBUG, "Connecting");
1316  opts.cleansession = 0;
1317  opts.onSuccess = test7_onConnect;
1318  rc = MQTTAsync_connect(c, &opts);
1319  rc = 0;
1320  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1321  if (rc != MQTTASYNC_SUCCESS)
1322  goto exit;
1323 
1324  while (!test7_subscribed)
1325  #if defined(_WIN32)
1326  Sleep(100);
1327  #else
1328  usleep(10000L);
1329  #endif
1330 
1331  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1332  pubmsg.payloadlen = 11;
1333  pubmsg.qos = 2;
1334  pubmsg.retained = 0;
1335  rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &ropts);
1336  MyLog(LOGA_DEBUG, "Token was %d", ropts.token);
1337  rc = MQTTAsync_isComplete(c, ropts.token);
1338  /*assert("0 rc from isComplete", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);*/
1339  rc = MQTTAsync_waitForCompletion(c, ropts.token, 5000L);
1340  assert("Good rc from waitForCompletion", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1341  rc = MQTTAsync_isComplete(c, ropts.token);
1342  assert("1 rc from isComplete", rc == 1, "rc was %d", rc);
1343 
1344  test7_messageCount = 0;
1345  int i = 0;
1346  pubmsg.qos = 2;
1347  for (i = 0; i < msg_count; ++i)
1348  {
1349  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1350  pubmsg.payloadlen = 11;
1351  //pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2;
1352  pubmsg.retained = 0;
1353  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &ropts);
1354  }
1355  /* disconnect immediately without receiving the incoming messages */
1356  dopts.timeout = 0;
1357  dopts.onSuccess = test7_onDisconnect;
1358  dopts.context = c;
1359  MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */
1360 
1361  while (!test_finished)
1362  #if defined(_WIN32)
1363  Sleep(100);
1364  #else
1365  usleep(10000L);
1366  #endif
1367  test_finished = 0;
1368 
1369  rc = MQTTAsync_getPendingTokens(c, &tokens);
1370  assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1371 
1372  assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
1373  MQTTAsync_free(tokens);
1374 
1375  MQTTAsync_destroy(&c); /* force re-reading persistence on create */
1376 
1378  rc = MQTTAsync_create(&c, options.connection, "async_test7", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
1379  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1380  if (rc != MQTTASYNC_SUCCESS)
1381  {
1382  MQTTAsync_destroy(&c);
1383  goto exit;
1384  }
1385 
1386  rc = MQTTAsync_getPendingTokens(c, &tokens);
1387  assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1388 
1389  assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
1390  if (tokens)
1391  {
1392  int i = 0;
1393  while (tokens[i] != -1)
1394  MyLog(LOGA_DEBUG, "Delivery token %d", tokens[i++]);
1395  MQTTAsync_free(tokens);
1396  //The following assertion should work, does with RSMB, but not Mosquitto
1397  //assert1("no of tokens should be count", i == msg_count, "no of tokens %d count %d", i, msg_count);
1398  }
1399 
1400  rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
1401  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1402 
1403  MyLog(LOGA_DEBUG, "Reconnecting");
1404  opts.context = c;
1405  if (MQTTAsync_connect(c, &opts) != 0)
1406  {
1407  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1408  goto exit;
1409  }
1410 
1411  #if defined(_WIN32)
1412  Sleep(5000);
1413  #else
1414  usleep(5000000L);
1415  #endif
1416 
1417  rc = MQTTAsync_getPendingTokens(c, &tokens);
1418  assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1419  /* assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
1420 
1421  assert1("no of messages should be count", test7_messageCount == msg_count, "no of tokens %d count %d",
1422  test7_messageCount, msg_count);
1423 
1424  assertions fail against Mosquitto - needs testing */
1425 
1427  dopts.onSuccess = test7_onDisconnect;
1428  dopts.timeout = 1000;
1429  MQTTAsync_disconnect(c, &dopts);
1430 
1431  while (!test_finished)
1432  #if defined(_WIN32)
1433  Sleep(100);
1434  #else
1435  usleep(10000L);
1436  #endif
1437 
1438  MQTTAsync_destroy(&c);
1439 
1440 exit:
1441  MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
1442  (failures == 0) ? "passed" : "failed", tests, failures);
1444  return failures;
1445 }
1446 
1447 
1448 
1449 /*********************************************************************
1450 
1451 Test8: Incomplete commands and requests
1452 
1453 *********************************************************************/
1454 
1455 char* test8_topic = "C client test8";
1459 
1461 {
1462  MQTTAsync c = (MQTTAsync)context;
1463 
1464  MyLog(LOGA_DEBUG, "In publish onSuccess callback %p token %d", c, response->token);
1465 
1466 }
1467 
1469 {
1470  MQTTAsync c = (MQTTAsync)context;
1471  MyLog(LOGA_DEBUG, "In onPublish failure callback %p", c);
1472 
1473  assert("Response code should be interrupted", response->code == MQTTASYNC_OPERATION_INCOMPLETE,
1474  "rc was %d", response->code);
1475 
1477 }
1478 
1479 
1481 {
1482  MQTTAsync c = (MQTTAsync)context;
1483  MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
1484 
1485  assert("Successful disconnect", 0, "disconnect failed", 0);
1486 
1487  test_finished = 1;
1488 }
1489 
1490 
1492 {
1493  MQTTAsync c = (MQTTAsync)context;
1494  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
1495  test_finished = 1;
1496 }
1497 
1498 
1500 {
1501  MQTTAsync c = (MQTTAsync)context;
1502 
1503  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
1504 
1505  test8_subscribed = 1;
1506 }
1507 
1508 
1510 {
1511  MQTTAsync c = (MQTTAsync)context;
1513  int rc;
1514 
1515  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
1517  opts.context = c;
1518 
1519  rc = MQTTAsync_subscribe(c, test8_topic, 2, &opts);
1520  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1521  if (rc != MQTTASYNC_SUCCESS)
1522  test_finished = 1;
1523 }
1524 
1525 int test8_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
1526 {
1527  MQTTAsync c = (MQTTAsync)context;
1528  static int message_count = 0;
1529 
1530  MyLog(LOGA_DEBUG, "Test8: received message id %d", message->msgid);
1531 
1533 
1534  MQTTAsync_freeMessage(&message);
1535  MQTTAsync_free(topicName);
1536 
1537  return 1;
1538 }
1539 
1540 
1541 int test8(struct Options options)
1542 {
1543  int subsqos = 2;
1544  MQTTAsync c;
1546  int rc = 0;
1550  MQTTAsync_token* tokens = NULL;
1551  int msg_count = 6;
1552 
1553  MyLog(LOGA_INFO, "Starting test 8 - incomplete commands");
1554  fprintf(xml, "<testcase classname=\"test4\" name=\"incomplete commands\"");
1556  test_finished = 0;
1557 
1558  rc = MQTTAsync_create(&c, options.connection, "async_test8",
1560  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1561  if (rc != MQTTASYNC_SUCCESS)
1562  {
1563  MQTTAsync_destroy(&c);
1564  goto exit;
1565  }
1566 
1567  rc = MQTTAsync_setCallbacks(c, c, NULL, test8_messageArrived, NULL);
1568  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1569 
1570  opts.keepAliveInterval = 20;
1571  opts.username = "testuser";
1572  opts.password = "testpassword";
1573  opts.MQTTVersion = options.MQTTVersion;
1574 
1575  opts.onFailure = NULL;
1576  opts.context = c;
1577 
1578  MyLog(LOGA_DEBUG, "Connecting");
1579  opts.cleansession = 1;
1580  opts.onSuccess = test8_onConnect;
1581  rc = MQTTAsync_connect(c, &opts);
1582  rc = 0;
1583  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1584  if (rc != MQTTASYNC_SUCCESS)
1585  goto exit;
1586 
1587  while (!test8_subscribed)
1588  #if defined(_WIN32)
1589  Sleep(100);
1590  #else
1591  usleep(10000L);
1592  #endif
1593 
1594  int i = 0;
1595  pubmsg.qos = 2;
1596  ropts.onSuccess = test8_onPublish;
1598  ropts.context = c;
1599  for (i = 0; i < msg_count; ++i)
1600  {
1601  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1602  pubmsg.payloadlen = 11;
1603  pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2; /* alternate */
1604  pubmsg.retained = 0;
1605  rc = MQTTAsync_sendMessage(c, test8_topic, &pubmsg, &ropts);
1606  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1607  }
1608  /* disconnect immediately without completing the commands */
1609  dopts.timeout = 0;
1610  dopts.onSuccess = test8_onDisconnect;
1611  dopts.context = c;
1612  rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
1613  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1614 
1615  while (!test_finished)
1616  #if defined(_WIN32)
1617  Sleep(100);
1618  #else
1619  usleep(10000L);
1620  #endif
1621  test_finished = 0;
1622 
1623  rc = MQTTAsync_getPendingTokens(c, &tokens);
1624  assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1625  assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
1626 
1627  assert("test8_publishFailures > 0", test8_publishFailures > 0,
1628  "test8_publishFailures = %d", test8_publishFailures);
1629 
1630  /* Now elicit failure callbacks on destroy */
1631 
1633 
1634  MyLog(LOGA_DEBUG, "Connecting");
1635  opts.cleansession = 0;
1636  opts.onSuccess = test8_onConnect;
1637  rc = MQTTAsync_connect(c, &opts);
1638  rc = 0;
1639  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1640  if (rc != MQTTASYNC_SUCCESS)
1641  goto exit;
1642 
1643  while (!test8_subscribed)
1644  #if defined(_WIN32)
1645  Sleep(100);
1646  #else
1647  usleep(10000L);
1648  #endif
1649 
1650  i = 0;
1651  pubmsg.qos = 2;
1652  ropts.onSuccess = test8_onPublish;
1654  ropts.context = c;
1655  for (i = 0; i < msg_count; ++i)
1656  {
1657  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1658  pubmsg.payloadlen = 11;
1659  pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2; /* alternate */
1660  pubmsg.retained = 0;
1661  rc = MQTTAsync_sendMessage(c, test8_topic, &pubmsg, &ropts);
1662  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1663  }
1664  /* disconnect immediately without completing the commands */
1665  dopts.timeout = 0;
1666  dopts.onSuccess = test8_onDisconnect;
1667  dopts.context = c;
1668  rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
1669  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1670 
1671  while (!test_finished)
1672  #if defined(_WIN32)
1673  Sleep(100);
1674  #else
1675  usleep(10000L);
1676  #endif
1677  test_finished = 0;
1678 
1679  rc = MQTTAsync_getPendingTokens(c, &tokens);
1680  assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1681  assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
1682  MQTTAsync_free(tokens);
1683 
1684  assert("test8_publishFailures == 0", test8_publishFailures == 0,
1685  "test8_publishFailures = %d", test8_publishFailures);
1686 
1687  MQTTAsync_destroy(&c);
1688 
1689  assert("test8_publishFailures > 0", test8_publishFailures > 0,
1690  "test8_publishFailures = %d", test8_publishFailures);
1691 
1692  /* cleanup persistence of any left over message data*/
1693 
1695  rc = MQTTAsync_create(&c, options.connection, "async_test8",
1697  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1698  if (rc != MQTTASYNC_SUCCESS)
1699  {
1700  MQTTAsync_destroy(&c);
1701  goto exit;
1702  }
1703 
1704  test8_subscribed = 0;
1705  opts.cleansession = 1;
1706  opts.context = c;
1707 
1708  rc = MQTTAsync_connect(c, &opts);
1709  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1710  if (rc != MQTTASYNC_SUCCESS)
1711  goto exit;
1712 
1713  while (!test8_subscribed)
1714 #if defined(_WIN32)
1715  Sleep(100);
1716 #else
1717  usleep(10000L);
1718 #endif
1719 
1720  test_finished = 0;
1721  dopts.context = c;
1722  rc = MQTTAsync_disconnect(c, &dopts);
1723  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1724 
1725  while (!test_finished)
1726 #if defined(_WIN32)
1727  Sleep(100);
1728 #else
1729  usleep(10000L);
1730 #endif
1731 
1732  MQTTAsync_destroy(&c);
1733 
1734 exit:
1735  MyLog(LOGA_INFO, "TEST8: test %s. %d tests run, %d failures.",
1736  (failures == 0) ? "passed" : "failed", tests, failures);
1738  return failures;
1739 }
1740 
1741 
1742 
1743 void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
1744 {
1745  printf("Trace : %d, %s\n", level, message);
1746 }
1747 
1748 
1749 
1750 
1751 int main(int argc, char** argv)
1752 {
1753  int rc = 0;
1754  int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7, test8}; /* indexed starting from 1 */
1755  MQTTAsync_nameValue* info;
1756  int i;
1757 
1758  xml = fopen("TEST-test4.xml", "w");
1759  fprintf(xml, "<testsuite name=\"test4\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests)) - 1);
1760 
1761  getopts(argc, argv);
1762 
1764 
1765  info = MQTTAsync_getVersionInfo();
1766  while (info->name)
1767  {
1768  MyLog(LOGA_INFO, "%s: %s", info->name, info->value);
1769  info++;
1770  }
1771 
1772  for (i = 0; i < options.iterations; ++i)
1773  {
1774  if (options.test_no == -1)
1775  { /* run all the tests */
1777  {
1778  failures = 0;
1780  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
1781  }
1782  }
1783  else
1784  {
1786  rc = tests[options.test_no](options); /* run just the selected test */
1787  }
1788  }
1789 
1790  if (rc == 0)
1791  MyLog(LOGA_INFO, "verdict pass");
1792  else
1793  MyLog(LOGA_INFO, "verdict fail");
1794 
1795  fprintf(xml, "</testsuite>\n");
1796  fclose(xml);
1797 
1798  return rc;
1799 }
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
void test2_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:427
#define LOGA_INFO
Definition: test4.c:116
enum MQTTPropertyCodes value
int test7(struct Options options)
Definition: test4.c:1253
int test7_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test4.c:1185
int size
Definition: test11.c:53
void test3_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:609
void test3_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:644
void test4_onPublish(void *context, MQTTAsync_successData *response)
Definition: test4.c:738
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
void test8_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1509
const char * name
Definition: MQTTAsync.h:1149
union MQTTAsync_successData::@46 alt
void test8_onDisconnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:1480
int test4(struct Options options)
Definition: test4.c:854
void test1_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:321
const char * message
Definition: MQTTAsync.h:996
char * connection
const char * topicName
Definition: MQTTAsync.h:994
char test_topic[100]
Definition: test4.c:528
void test7_onUnsubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:1170
void test1_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:339
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
long elapsed(START_TIME_TYPE start_time)
Definition: test4.c:204
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
void test6_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1036
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
int test8_messageCount
Definition: test4.c:1456
void test7_onConnectOnly(void *context, MQTTAsync_successData *response)
Definition: test4.c:1230
void test8_onPublishFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:1468
struct pubsub_opts opts
Definition: paho_c_pub.c:42
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
int test8_publishFailures
Definition: test4.c:1458
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
Definition: test4.c:237
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4209
void getopts(int argc, char **argv)
Definition: test4.c:65
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
char * test7_topic
Definition: test4.c:1149
#define LOGA_DEBUG
Definition: test4.c:115
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
static char msg_buf[512]
Definition: Log.c:122
char * cur_output
Definition: test4.c:222
void test1_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:263
int test8(struct Options options)
Definition: test4.c:1541
void test8_onPublish(void *context, MQTTAsync_successData *response)
Definition: test4.c:1460
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
int test3(struct Options options)
Definition: test4.c:663
int failures
Definition: test4.c:218
#define MQTTASYNC_OPERATION_INCOMPLETE
Definition: MQTTAsync.h:163
const char * message
Definition: MQTTAsync.h:518
void test7_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:1203
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
char output[3000]
Definition: test4.c:221
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
void test7_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1162
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
Definition: MQTTAsync.c:4737
void test8_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1491
int test7_messageCount
Definition: test4.c:1150
description
Definition: setup.py:19
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
int message_count
Definition: test5.c:72
void test3_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:627
void test5_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:933
char * test8_topic
Definition: test4.c:1455
void test2_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:436
void MyLog(int LOGA_level, char *format,...)
Definition: test4.c:120
void test3_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:533
MQTTAsync_token token
Definition: MQTTAsync.h:549
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1393
MQTTAsync_token token
Definition: MQTTAsync.h:714
struct MQTTAsync_connectOptions::@55 binarypwd
char clientid[24]
Definition: test4.c:527
void test8_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:1499
int test5(struct Options options)
Definition: test4.c:949
int test4_payloadlen
Definition: test4.c:736
void usage(void)
Definition: test4.c:41
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
int MQTTAsync_token
Definition: MQTTAsync.h:249
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
int test3_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test4.c:563
void test5_onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:920
void test1_onUnsubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:271
int msg_count
Definition: test11.c:559
void test3_onPublish(void *context, MQTTAsync_successData *response)
Definition: test4.c:541
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
void test4_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:832
int index
Definition: test4.c:526
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
Definition: MQTTAsync.c:4803
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
static int test7_subscribed
Definition: test4.c:1201
void test7_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1213
int test6(struct Options options)
Definition: test4.c:1050
int test8_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test4.c:1525
void test6_onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:1009
void test3_onUnsubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:548
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
const char * value
Definition: MQTTAsync.h:1150
void write_test_result(void)
Definition: test4.c:224
void test6_onConnect(void *context, MQTTAsync_successData *response)
Definition: test4.c:1024
#define assert(a, b, c, d)
Definition: test4.c:214
START_TIME_TYPE global_start_time
Definition: test4.c:220
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
MQTTClient c
Definition: test10.c:1656
dictionary context
Definition: test2.py:57
char * clientid
Definition: test6.c:54
int tests
Definition: test4.c:217
#define num_clients
volatile int test_finished
Definition: test4.c:258
null localtime_s(...)
Definition: chrono.h:286
int test1(struct Options options)
Definition: test4.c:361
#define MQTTCLIENT_PERSISTENCE_NONE
int message_count
Definition: test4.c:529
int test2_onFailure_called
Definition: test4.c:425
void * test4_payload
Definition: test4.c:735
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: test4.c:1743
#define START_TIME_TYPE
Definition: test4.c:177
char * test_topic
Definition: test4.c:260
int test2(struct Options options)
Definition: test4.c:451
#define ARRAY_SIZE(a)
Definition: test4.c:39
void test4_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test4.c:810
FILE * xml
Definition: test4.c:219
char *const * serverURIs
Definition: MQTTAsync.h:1277
MQTTAsync c
Definition: test4.c:525
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
void test7_onDisconnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test4.c:1152
MQTTAsync c
Definition: test4.c:1005
START_TIME_TYPE start_clock(void)
Definition: test4.c:179
enum MQTTReasonCodes rc
Definition: test10.c:1112
int main(int argc, char **argv)
Definition: test4.c:1751
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
int test1_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test4.c:286
int test_no
Definition: test1.c:54
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
Definition: MQTTAsync.c:4848
int test4_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test4.c:745
int test8_subscribed
Definition: test4.c:1457
struct Options options


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48