test1.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2018 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - MQTT 3.1.1 support
16  * Ian Craggs - change will message test back to using proxy
17  *******************************************************************************/
18 
19 
25 #include "MQTTClient.h"
26 #include <string.h>
27 #include <stdlib.h>
28 
29 #if !defined(_WINDOWS)
30  #include <sys/time.h>
31  #include <sys/socket.h>
32  #include <unistd.h>
33  #include <errno.h>
34 #else
35  #include <windows.h>
36  #define setenv(a, b, c) _putenv_s(a, b)
37 #endif
38 
39 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
40 
41 void usage(void)
42 {
43  printf("help!!\n");
44  exit(EXIT_FAILURE);
45 }
46 
47 struct Options
48 {
49  char* connection;
50  char** haconnections;
52  int hacount;
53  int verbose;
54  int test_no;
55  int MQTTVersion;
56  int iterations;
57 } options =
58 {
59  "tcp://mqtt.eclipse.org:1883",
60  NULL,
61  "tcp://localhost:1883",
62  0,
63  0,
64  0,
66  1,
67 };
68 
69 void getopts(int argc, char** argv)
70 {
71  int count = 1;
72 
73  while (count < argc)
74  {
75  if (strcmp(argv[count], "--test_no") == 0)
76  {
77  if (++count < argc)
78  options.test_no = atoi(argv[count]);
79  else
80  usage();
81  }
82  else if (strcmp(argv[count], "--connection") == 0)
83  {
84  if (++count < argc)
85  {
86  options.connection = argv[count];
87  printf("\nSetting connection to %s\n", options.connection);
88  }
89  else
90  usage();
91  }
92  else if (strcmp(argv[count], "--haconnections") == 0)
93  {
94  if (++count < argc)
95  {
96  char* tok = strtok(argv[count], " ");
97  options.hacount = 0;
98  options.haconnections = malloc(sizeof(char*) * 5);
99  while (tok)
100  {
101  options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
102  strcpy(options.haconnections[options.hacount], tok);
103  options.hacount++;
104  tok = strtok(NULL, " ");
105  }
106  }
107  else
108  usage();
109  }
110  else if (strcmp(argv[count], "--proxy_connection") == 0)
111  {
112  if (++count < argc)
114  else
115  usage();
116  }
117  else if (strcmp(argv[count], "--MQTTversion") == 0)
118  {
119  if (++count < argc)
120  {
121  options.MQTTVersion = atoi(argv[count]);
122  printf("setting MQTT version to %d\n", options.MQTTVersion);
123  }
124  else
125  usage();
126  }
127  else if (strcmp(argv[count], "--iterations") == 0)
128  {
129  if (++count < argc)
130  options.iterations = atoi(argv[count]);
131  else
132  usage();
133  }
134  else if (strcmp(argv[count], "--verbose") == 0)
135  {
136  options.verbose = 1;
137  printf("\nSetting verbose on\n");
138  }
139  count++;
140  }
141 }
142 
143 
144 #define LOGA_DEBUG 0
145 #define LOGA_INFO 1
146 #include <stdarg.h>
147 #include <time.h>
148 #include <sys/timeb.h>
149 void MyLog(int LOGA_level, char* format, ...)
150 {
151  static char msg_buf[256];
152  va_list args;
153 #if defined(_WIN32) || defined(_WINDOWS)
154  struct timeb ts;
155 #else
156  struct timeval ts;
157 #endif
158  struct tm timeinfo;
159 
160  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
161  return;
162 
163 #if defined(_WIN32) || defined(_WINDOWS)
164  ftime(&ts);
165  localtime_s(&timeinfo, &ts.time);
166 #else
167  gettimeofday(&ts, NULL);
168  localtime_r(&ts.tv_sec, &timeinfo);
169 #endif
170  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
171 
172 #if defined(_WIN32) || defined(_WINDOWS)
173  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
174 #else
175  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
176 #endif
177 
178  va_start(args, format);
179  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
180  va_end(args);
181 
182  printf("%s\n", msg_buf);
183  fflush(stdout);
184 }
185 
186 
187 #if defined(_WIN32) || defined(_WINDOWS)
188 #define mqsleep(A) Sleep(1000*A)
189 #define START_TIME_TYPE DWORD
190 static DWORD start_time = 0;
192 {
193  return GetTickCount();
194 }
195 #elif defined(AIX)
196 #define mqsleep sleep
197 #define START_TIME_TYPE struct timespec
199 {
200  static struct timespec start;
201  clock_gettime(CLOCK_REALTIME, &start);
202  return start;
203 }
204 #else
205 #define mqsleep sleep
206 #define START_TIME_TYPE struct timeval
207 /* TODO - unused - remove? static struct timeval start_time; */
209 {
210  struct timeval start_time;
211  gettimeofday(&start_time, NULL);
212  return start_time;
213 }
214 #endif
215 
216 
217 #if defined(_WIN32)
218 long elapsed(START_TIME_TYPE start_time)
219 {
220  return GetTickCount() - start_time;
221 }
222 #elif defined(AIX)
223 #define assert(a)
224 long elapsed(struct timespec start)
225 {
226  struct timespec now, res;
227 
228  clock_gettime(CLOCK_REALTIME, &now);
229  ntimersub(now, start, res);
230  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
231 }
232 #else
233 long elapsed(START_TIME_TYPE start_time)
234 {
235  struct timeval now, res;
236 
237  gettimeofday(&now, NULL);
238  timersub(&now, &start_time, &res);
239  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
240 }
241 #endif
242 
243 
244 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
245 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
246 
247 int tests = 0;
248 int failures = 0;
249 FILE* xml;
251 char output[3000];
253 
254 
256 {
257  long duration = elapsed(global_start_time);
258 
259  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
260  if (cur_output != output)
261  {
262  fprintf(xml, "%s", output);
263  cur_output = output;
264  }
265  fprintf(xml, "</testcase>\n");
266 }
267 
268 
269 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
270 {
271  ++tests;
272  if (!value)
273  {
274  va_list args;
275 
276  ++failures;
277  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
278 
279  va_start(args, format);
280  vprintf(format, args);
281  va_end(args);
282 
283  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
284  description, filename, lineno);
285  }
286  else
287  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
288 }
289 
290 
291 /*********************************************************************
292 
293 Test1: single-threaded client
294 
295 *********************************************************************/
297 {
300  MQTTClient_message* m = NULL;
301  char* topicName = NULL;
302  int topicLen;
303  int i = 0;
304  int iterations = 50;
305  int rc;
306 
307  MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
308  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
309  pubmsg.payloadlen = 11;
310  pubmsg.qos = qos;
311  pubmsg.retained = 0;
312 
313  for (i = 0; i< iterations; ++i)
314  {
315  if (i % 10 == 0)
316  rc = MQTTClient_publish(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &dt);
317  else
318  rc = MQTTClient_publishMessage(c, test_topic, &pubmsg, &dt);
319  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
320 
321  if (qos > 0)
322  {
323  rc = MQTTClient_waitForCompletion(c, dt, 5000L);
324  assert("Good rc from waitforCompletion", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
325  }
326 
327  rc = MQTTClient_receive(c, &topicName, &topicLen, &m, 5000);
328  assert("Good rc from receive", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
329  if (topicName)
330  {
331  MyLog(LOGA_DEBUG, "Message received on topic %s is %.*s", topicName, m->payloadlen, (char*)(m->payload));
332  if (pubmsg.payloadlen != m->payloadlen ||
333  memcmp(m->payload, pubmsg.payload, m->payloadlen) != 0)
334  {
335  failures++;
336  MyLog(LOGA_INFO, "Error: wrong data - received lengths %d %d", pubmsg.payloadlen, m->payloadlen);
337  break;
338  }
339  MQTTClient_free(topicName);
341  }
342  else
343  printf("No message received within timeout period\n");
344  }
345 
346  /* receive any outstanding messages */
347  MQTTClient_receive(c, &topicName, &topicLen, &m, 2000);
348  while (topicName)
349  {
350  printf("Message received on topic %s is %.*s.\n", topicName, m->payloadlen, (char*)(m->payload));
351  MQTTClient_free(topicName);
353  MQTTClient_receive(c, &topicName, &topicLen, &m, 2000);
354  }
355 }
356 
357 
358 int test1(struct Options options)
359 {
360  int subsqos = 2;
361  MQTTClient c;
364  int rc = 0;
365  char* test_topic = "C client test1";
366 
367  fprintf(xml, "<testcase classname=\"test1\" name=\"single threaded client using receive\"");
369  failures = 0;
370  MyLog(LOGA_INFO, "Starting test 1 - single threaded client using receive");
371 
372  rc = MQTTClient_create(&c, options.connection, "single_threaded_test",
374  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
375  if (rc != MQTTCLIENT_SUCCESS)
376  {
377  MQTTClient_destroy(&c);
378  goto exit;
379  }
380 
381  opts.keepAliveInterval = 20;
382  opts.cleansession = 1;
383  opts.username = "testuser";
384  opts.password = "testpassword";
385  opts.MQTTVersion = options.MQTTVersion;
386  if (options.haconnections != NULL)
387  {
388  opts.serverURIs = options.haconnections;
389  opts.serverURIcount = options.hacount;
390  }
391 
392  opts.will = &wopts;
393  opts.will->message = "will message";
394  opts.will->qos = 1;
395  opts.will->retained = 0;
396  opts.will->topicName = "will topic";
397  opts.will = NULL;
398 
399  MyLog(LOGA_DEBUG, "Connecting");
400  rc = MQTTClient_connect(c, &opts);
401  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
402  if (rc != MQTTCLIENT_SUCCESS)
403  goto exit;
404 
405  rc = MQTTClient_subscribe(c, test_topic, subsqos);
406  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
407 
408  test1_sendAndReceive(c, 0, test_topic);
409  test1_sendAndReceive(c, 1, test_topic);
410  test1_sendAndReceive(c, 2, test_topic);
411 
412  MyLog(LOGA_DEBUG, "Stopping\n");
413 
414  rc = MQTTClient_unsubscribe(c, test_topic);
415  assert("Unsubscribe successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
416  rc = MQTTClient_disconnect(c, 0);
417  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
418 
419  /* Just to make sure we can connect again */
420  rc = MQTTClient_connect(c, &opts);
421  assert("Connect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
422  rc = MQTTClient_disconnect(c, 0);
423  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
424 
425  MQTTClient_destroy(&c);
426 
427 exit:
428  MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
429  (failures == 0) ? "passed" : "failed", tests, failures);
431  return failures;
432 }
433 
434 
435 /*********************************************************************
436 
437 Test2: multi-threaded client using callbacks
438 
439 *********************************************************************/
440 volatile int test2_arrivedcount = 0;
443 
445 {
447 }
448 
449 int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
450 {
452  MyLog(LOGA_DEBUG, "Callback: %d message received on topic %s is %.*s.",
453  test2_arrivedcount, topicName, m->payloadlen, (char*)(m->payload));
454  if (test2_pubmsg.payloadlen != m->payloadlen ||
455  memcmp(m->payload, test2_pubmsg.payload, m->payloadlen) != 0)
456  {
457  failures++;
458  MyLog(LOGA_INFO, "Error: wrong data received lengths %d %d\n", test2_pubmsg.payloadlen, m->payloadlen);
459  }
460  MQTTClient_free(topicName);
462  return 1;
463 }
464 
465 
467 {
469  int i = 0;
470  int iterations = 50;
471  int rc = 0;
472  int wait_seconds = 0;
473 
475 
476  MyLog(LOGA_INFO, "%d messages at QoS %d", iterations, qos);
477  test2_pubmsg.payload = "a much longer message that we can shorten to the extent that we need to";
478  test2_pubmsg.payloadlen = 27;
479  test2_pubmsg.qos = qos;
480  test2_pubmsg.retained = 0;
481 
482  for (i = 1; i <= iterations; ++i)
483  {
484  if (i % 10 == 0)
485  rc = MQTTClient_publish(c, test_topic, test2_pubmsg.payloadlen, test2_pubmsg.payload,
486  test2_pubmsg.qos, test2_pubmsg.retained, NULL);
487  else
488  rc = MQTTClient_publishMessage(c, test_topic, &test2_pubmsg, &dt);
489  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
490 
491  #if defined(_WIN32)
492  Sleep(100);
493  #else
494  usleep(100000L);
495  #endif
496 
497  wait_seconds = 10;
498  while ((test2_arrivedcount < i) && (wait_seconds-- > 0))
499  {
500  MyLog(LOGA_DEBUG, "Arrived %d count %d", test2_arrivedcount, i);
501  #if defined(_WIN32)
502  Sleep(1000);
503  #else
504  usleep(1000000L);
505  #endif
506  }
507  assert("Message Arrived", wait_seconds > 0,
508  "Time out waiting for message %d\n", i );
509  }
510  if (qos > 0)
511  {
512  /* MQ Telemetry can send a message to a subscriber before the server has
513  completed the QoS 2 handshake with the publisher. For QoS 1 and 2,
514  allow time for the final delivery complete callback before checking
515  that all expected callbacks have been made */
516  wait_seconds = 10;
517  while ((test2_deliveryCompleted < iterations) && (wait_seconds-- > 0))
518  {
519  MyLog(LOGA_DEBUG, "Delivery Completed %d count %d", test2_deliveryCompleted, i);
520  #if defined(_WIN32)
521  Sleep(1000);
522  #else
523  usleep(1000000L);
524  #endif
525  }
526  assert("All Deliveries Complete", wait_seconds > 0,
527  "Number of deliveryCompleted callbacks was %d\n",
529  }
530 }
531 
532 
533 int test2(struct Options options)
534 {
535  char* testname = "test2";
536  int subsqos = 2;
537  /* TODO - usused - remove ? MQTTClient_deliveryToken* dt = NULL; */
538  MQTTClient c;
540  int rc = 0;
541  char* test_topic = "C client test2";
542 
543  fprintf(xml, "<testcase classname=\"test1\" name=\"multi-threaded client using callbacks\"");
544  MyLog(LOGA_INFO, "Starting test 2 - multi-threaded client using callbacks");
546  failures = 0;
547 
548  MQTTClient_create(&c, options.connection, "multi_threaded_sample", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
549 
550  opts.keepAliveInterval = 20;
551  opts.cleansession = 1;
552  opts.MQTTVersion = options.MQTTVersion;
553  opts.username = "testuser";
554  opts.binarypwd.data = "testpassword";
555  opts.binarypwd.len = (int)strlen(opts.binarypwd.data);
556  if (options.haconnections != NULL)
557  {
558  opts.serverURIs = options.haconnections;
559  opts.serverURIcount = options.hacount;
560  }
561 
563  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
564 
565  MyLog(LOGA_DEBUG, "Connecting");
566  rc = MQTTClient_connect(c, &opts);
567  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
568  if (rc != MQTTCLIENT_SUCCESS)
569  goto exit;
570 
571  rc = MQTTClient_subscribe(c, test_topic, subsqos);
572  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
573 
574  test2_sendAndReceive(c, 0, test_topic);
575  test2_sendAndReceive(c, 1, test_topic);
576  test2_sendAndReceive(c, 2, test_topic);
577 
578  MyLog(LOGA_DEBUG, "Stopping");
579 
580  rc = MQTTClient_unsubscribe(c, test_topic);
581  assert("Unsubscribe successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
582  rc = MQTTClient_disconnect(c, 0);
583  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
584 
585  MQTTClient_destroy(&c);
586 
587 exit:
588  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
589  (failures == 0) ? "passed" : "failed", testname, tests, failures);
591  return failures;
592 }
593 
594 
595 /*********************************************************************
596 
597 Test 3: connack return codes
598 
599 for AMQTDD, needs an amqtdd.cfg of:
600 
601  allow_anonymous false
602  password_file passwords
603 
604 and a passwords file of:
605 
606  Admin:Admin
607 
608 *********************************************************************/
609 int test3(struct Options options)
610 {
611  char* testname = "test3";
612  int rc;
613  MQTTClient c;
616 
617  fprintf(xml, "<testcase classname=\"test1\" name=\"connack return codes\"");
619  failures = 0;
620  MyLog(LOGA_INFO, "Starting test 3 - connack return codes");
621 
622 #if 0
623  /* clientid too long (RC = 2) */
624  rc = MQTTClient_create(&c, options.connection, "client_ID_too_long_for_MQTT_protocol_version_3",
626  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
627  rc = MQTTClient_connect(c, &opts);
628  assert("identifier rejected", rc == 2, "rc was %d\n", rc);
629  MQTTClient_destroy(&c);
630 #endif
631  /* broker unavailable (RC = 3) - TDD when allow_anonymous not set*/
632  rc = MQTTClient_create(&c, options.connection, "The C Client", MQTTCLIENT_PERSISTENCE_NONE, NULL);
633  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
634 #if 0
635  rc = MQTTClient_connect(c, &opts);
636  assert("broker unavailable", rc == 3, "rc was %d\n", rc);
637 
638  /* authentication failure (RC = 4) */
639  opts.username = "Admin";
640  opts.password = "fred";
641  rc = MQTTClient_connect(c, &opts);
642  assert("Bad user name or password", rc == 4, "rc was %d\n", rc);
643 #endif
644 
645  /* authorization failure (RC = 5) */
646  opts.username = "Admin";
647  opts.password = "Admin";
648  /*opts.will = &wopts; "Admin" not authorized to publish to Will topic by default
649  opts.will->message = "will message";
650  opts.will->qos = 1;
651  opts.will->retained = 0;
652  opts.will->topicName = "will topic";*/
653  rc = MQTTClient_connect(c, &opts);
654  //assert("Not authorized", rc == 5, "rc was %d\n", rc);
655 
656 #if 0
657  /* successful connection (RC = 0) */
658  opts.username = "Admin";
659  opts.password = "Admin";
660  opts.will = NULL;
661  rc = MQTTClient_connect(c, &opts);
662  assert("successful connection", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
663  MQTTClient_disconnect(c, 0);
664  MQTTClient_destroy(&c);
665 #endif
666 
667 /* TODO - unused - remove ? exit: */
668  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
669  (failures == 0) ? "passed" : "failed", testname, tests, failures);
671  return failures;
672 }
673 
674 
675 /*********************************************************************
676 
677 Test 4: client persistence 1
678 
679 
680 *********************************************************************/
681 int test4_run(int qos)
682 {
683  char* testname = "test 4";
684  char* topic = "Persistence test 1";
685  int subsqos = 2;
686  MQTTClient c;
688  MQTTClient_message* m = NULL;
689  char* topicName = NULL;
690  int topicLen;
691  MQTTClient_deliveryToken* tokens = NULL;
692  int mytoken = -99;
693  char buffer[100];
694  int count = 3;
695  int i, rc;
696 
697  failures = 0;
698  MyLog(LOGA_INFO, "Starting test 4 - persistence, qos %d", qos);
699 
701 
702  opts.keepAliveInterval = 20;
703  opts.reliable = 0;
705  if (options.haconnections != NULL)
706  {
709  }
710 
711  MyLog(LOGA_DEBUG, "Cleanup by connecting clean session\n");
712  opts.cleansession = 1;
713  if ((rc = MQTTClient_connect(c, &opts)) != 0)
714  {
715  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
716  return -1;
717  }
718  opts.cleansession = 0;
719  MQTTClient_disconnect(c, 0);
720 
721  MyLog(LOGA_DEBUG, "Connecting\n");
722  if ((rc = MQTTClient_connect(c, &opts)) != 0)
723  {
724  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
725  return -1;
726  }
727 
728  /* subscribe so we can get messages back */
729  rc = MQTTClient_subscribe(c, topic, subsqos);
730  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
731 
732  /* send messages so that we can receive the same ones */
733  for (i = 0; i < count; ++i)
734  {
735  sprintf(buffer, "Message sequence no %d", i);
736  rc = MQTTClient_publish(c, topic, 10, buffer, qos, 0, NULL);
737  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
738  }
739 
740  /* disconnect immediately without receiving the incoming messages */
741  MQTTClient_disconnect(c, 0); /* now there should be "orphaned" publications */
742 
743  rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
744  assert("getPendingDeliveryTokens rc == 0", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
745 
746  assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
747  if (tokens)
748  {
749  int i = 0;
750 
751  while (tokens[i] != -1)
752  MyLog(LOGA_DEBUG, "Pending delivery token %d", tokens[i++]);
753  MQTTClient_free(tokens);
754  assert1("no of tokens should be count", i == count, "no of tokens %d count %d", i, count);
755  mytoken = tokens[0];
756  }
757 
758  MQTTClient_destroy(&c); /* force re-reading persistence on create */
759 
761 
762  rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
763  assert("getPendingDeliveryTokens rc == 0", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
764 
765  assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
766  if (tokens)
767  {
768  int i = 0;
769  while (tokens[i] != -1)
770  MyLog(LOGA_DEBUG, "Pending delivery token %d", tokens[i++]);
771  MQTTClient_free(tokens);
772  assert1("no of tokens should be count", i == count, "no of tokens %d count %d", i, count);
773  }
774 
775  MyLog(LOGA_DEBUG, "Reconnecting");
776  if (MQTTClient_connect(c, &opts) != 0)
777  {
778  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
779  return -1;
780  }
781 
782  for (i = 0; i < count; ++i)
783  {
784  int dup = 0;
785  do
786  {
787  dup = 0;
788  MQTTClient_receive(c, &topicName, &topicLen, &m, 5000);
789  if (m && m->dup)
790  {
791  assert("No duplicates should be received for qos 2", qos == 1, "qos is %d", qos);
792  MyLog(LOGA_DEBUG, "Duplicate message id %d", m->msgid);
794  MQTTClient_free(topicName);
795  dup = 1;
796  }
797  } while (dup == 1);
798  assert("should get a message", m != NULL, "m was %p", m);
799  if (m)
800  {
801  MyLog(LOGA_DEBUG, "Received message id %d", m->msgid);
802  assert("topicName is correct", strcmp(topicName, topic) == 0, "topicName is %s", topicName);
804  MQTTClient_free(topicName);
805  }
806  }
807 
808  MQTTClient_yield(); /* allow any unfinished protocol exchanges to finish */
809 
810  rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
811  assert("getPendingDeliveryTokens rc == 0", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
812  assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
813 
814  MQTTClient_disconnect(c, 0);
815 
816  MQTTClient_destroy(&c);
817 
818 /* TODO - unused -remove? exit: */
819  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
820  (failures == 0) ? "passed" : "failed", testname, tests, failures);
821 
822  return failures;
823 }
824 
825 
826 int test4(struct Options options)
827 {
828  int rc = 0;
829  fprintf(xml, "<testcase classname=\"test1\" name=\"persistence\"");
831  rc = test4_run(1) + test4_run(2);
832  fprintf(xml, " time=\"%ld\" >\n", elapsed(global_start_time) / 1000);
833  if (cur_output != output)
834  {
835  fprintf(xml, "%s", output);
836  cur_output = output;
837  }
838  fprintf(xml, "</testcase>\n");
839  return rc;
840 }
841 
842 
843 /*********************************************************************
844 
845 Test 5: disconnect with quiesce timeout should allow exchanges to complete
846 
847 *********************************************************************/
848 int test5(struct Options options)
849 {
850  char* testname = "test 5";
851  char* topic = "Persistence test 2";
852  int subsqos = 2;
853  MQTTClient c;
855  MQTTClient_deliveryToken* tokens = NULL;
856  char buffer[100];
857  int count = 5;
858  int i, rc;
859 
860  fprintf(xml, "<testcase classname=\"test1\" name=\"disconnect with quiesce timeout should allow exchanges to complete\"");
862  failures = 0;
863  MyLog(LOGA_INFO, "Starting test 5 - disconnect with quiesce timeout should allow exchanges to complete");
864 
865  MQTTClient_create(&c, options.connection, "xrctest1_test_5", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
866 
867  opts.keepAliveInterval = 20;
868  opts.cleansession = 0;
869  opts.reliable = 0;
870  opts.MQTTVersion = options.MQTTVersion;
871  if (options.haconnections != NULL)
872  {
873  opts.serverURIs = options.haconnections;
874  opts.serverURIcount = options.hacount;
875  }
876 
877  MyLog(LOGA_DEBUG, "Connecting");
878  if ((rc = MQTTClient_connect(c, &opts)) != 0)
879  {
880  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
881  MQTTClient_destroy(&c);
882  goto exit;
883  }
884 
885  rc = MQTTClient_subscribe(c, topic, subsqos);
886  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
887 
888  for (i = 0; i < count; ++i)
889  {
890  sprintf(buffer, "Message sequence no %d", i);
891  rc = MQTTClient_publish(c, topic, 10, buffer, 1, 0, NULL);
892  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
893  }
894 
895  MQTTClient_disconnect(c, 1000); /* now there should be no "orphaned" publications */
896  MyLog(LOGA_DEBUG, "Disconnected");
897 
898  rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
899  assert("getPendingDeliveryTokens rc == 0", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
900 
901  assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
902 
903  MQTTClient_destroy(&c);
904 
905 exit:
906  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
907  (failures == 0) ? "passed" : "failed", testname, tests, failures);
909  return failures;
910 }
911 
912 
913 /*********************************************************************
914 
915 Test 6: connectionLost and will message
916 
917 *********************************************************************/
919 volatile int test6_will_message_arrived = 0;
921 
922 void test6_connectionLost(void* context, char* cause)
923 {
924  MQTTClient c = (MQTTClient)context;
925  printf("%s -> Callback: connection lost\n", (c == test6_c1) ? "Client-1" : "Client-2");
927 }
928 
930 {
931  printf("Client-2 -> Callback: publish complete for token %d\n", token);
932 }
933 
934 char* test6_will_topic = "C Test 2: will topic";
935 char* test6_will_message = "will message from Client-1";
936 
937 int test6_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
938 {
939  MQTTClient c = (MQTTClient)context;
940  printf("%s -> Callback: message received on topic '%s' is '%.*s'.\n",
941  (c == test6_c1) ? "Client-1" : "Client-2", topicName, m->payloadlen, (char*)(m->payload));
942  if (c == test6_c2 && strcmp(topicName, test6_will_topic) == 0 && memcmp(m->payload, test6_will_message, m->payloadlen) == 0)
944  MQTTClient_free(topicName);
946  return 1;
947 }
948 
949 
950 int test6(struct Options options)
951 {
952  char* testname = "test6";
956  int rc, count;
957  char* mqttsas_topic = "MQTTSAS topic";
958 
959  failures = 0;
960  MyLog(LOGA_INFO, "Starting test 6 - connectionLost and will messages");
961  fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and will messages\"");
963 
964  opts.keepAliveInterval = 2;
965  opts.cleansession = 1;
967  opts.will = &wopts;
969  opts.will->qos = 1;
970  opts.will->retained = 0;
972  if (options.haconnections != NULL)
973  {
974  opts.serverURIs = options.haconnections;
975  opts.serverURIcount = options.hacount;
976  }
977 
978  /* Client-1 with Will options */
980  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
981  if (rc != MQTTCLIENT_SUCCESS)
982  goto exit;
983 
985  assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
986  if (rc != MQTTCLIENT_SUCCESS)
987  goto exit;
988 
989  /* Connect to the broker */
990  rc = MQTTClient_connect(test6_c1, &opts);
991  assert("good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
992  if (rc != MQTTCLIENT_SUCCESS)
993  goto exit;
994 
995  /* Client - 2 (multi-threaded) */
996  rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
997  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
998 
999  /* Set the callback functions for the client */
1001  assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1002 
1003  /* Connect to the broker */
1004  opts2.keepAliveInterval = 20;
1005  opts2.cleansession = 1;
1006  MyLog(LOGA_INFO, "Connecting Client_2 ...");
1007  rc = MQTTClient_connect(test6_c2, &opts2);
1008  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1009 
1011  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1012 
1013  /* now send the command which will break the connection and cause the will message to be sent */
1014  rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
1015  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1016 
1017  MyLog(LOGA_INFO, "Waiting to receive the will message");
1018  count = 0;
1019  while (++count < 40)
1020  {
1021  #if defined(_WIN32)
1022  Sleep(1000L);
1023  #else
1024  sleep(1);
1025  #endif
1027  break;
1028  }
1029  assert("will message arrived", test6_will_message_arrived == 1,
1030  "will_message_arrived was %d\n", test6_will_message_arrived);
1031  assert("connection lost called", test6_connection_lost_called == 1,
1032  "connection_lost_called %d\n", test6_connection_lost_called);
1033 
1035  assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1036 
1038  assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
1039 
1041  assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
1042 
1043  rc = MQTTClient_disconnect(test6_c2, 100L);
1044  assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1045 
1048 
1049 exit:
1050  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
1051  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1053  return failures;
1054 }
1055 
1056 
1058 {
1059  char* testname = "test6a";
1063  int rc, count;
1064  char* mqttsas_topic = "MQTTSAS topic";
1065 
1066  failures = 0;
1067  MyLog(LOGA_INFO, "Starting test 6 - connectionLost and binary will messages");
1068  fprintf(xml, "<testcase classname=\"test1\" name=\"connectionLost and binary will messages\"");
1070 
1071  opts.keepAliveInterval = 2;
1072  opts.cleansession = 1;
1074  opts.will = &wopts;
1076  opts.will->payload.len = (int)strlen(test6_will_message) + 1;
1077  opts.will->qos = 1;
1078  opts.will->retained = 0;
1080  if (options.haconnections != NULL)
1081  {
1082  opts.serverURIs = options.haconnections;
1083  opts.serverURIcount = options.hacount;
1084  }
1085 
1086  /* Client-1 with Will options */
1088  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1089  if (rc != MQTTCLIENT_SUCCESS)
1090  goto exit;
1091 
1093  assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1094  if (rc != MQTTCLIENT_SUCCESS)
1095  goto exit;
1096 
1097  /* Connect to the broker */
1098  rc = MQTTClient_connect(test6_c1, &opts);
1099  assert("good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1100  if (rc != MQTTCLIENT_SUCCESS)
1101  goto exit;
1102 
1103  /* Client - 2 (multi-threaded) */
1104  rc = MQTTClient_create(&test6_c2, options.connection, "Client_2", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
1105  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1106 
1107  /* Set the callback functions for the client */
1109  assert("good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1110 
1111  /* Connect to the broker */
1112  opts2.keepAliveInterval = 20;
1113  opts2.cleansession = 1;
1114  MyLog(LOGA_INFO, "Connecting Client_2 ...");
1115  rc = MQTTClient_connect(test6_c2, &opts2);
1116  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1117 
1119  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1120 
1121  /* now send the command which will break the connection and cause the will message to be sent */
1122  rc = MQTTClient_publish(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
1123  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1124 
1125  MyLog(LOGA_INFO, "Waiting to receive the will message");
1126  count = 0;
1127  while (++count < 40)
1128  {
1129  #if defined(_WIN32)
1130  Sleep(1000L);
1131  #else
1132  sleep(1);
1133  #endif
1135  break;
1136  }
1137  assert("will message arrived", test6_will_message_arrived == 1,
1138  "will_message_arrived was %d\n", test6_will_message_arrived);
1139  assert("connection lost called", test6_connection_lost_called == 1,
1140  "connection_lost_called %d\n", test6_connection_lost_called);
1141 
1143  assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1144 
1146  assert("Client-2 still connected", rc == 1, "isconnected is %d", rc);
1147 
1149  assert("Client-1 not connected", rc == 0, "isconnected is %d", rc);
1150 
1151  rc = MQTTClient_disconnect(test6_c2, 100L);
1152  assert("Good rc from disconnect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1153 
1156 
1157 exit:
1158  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.\n",
1159  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1161  return failures;
1162 }
1163 
1164 int main(int argc, char** argv)
1165 {
1166  int rc = 0;
1167  int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test6a};
1168  int i;
1169 
1170  xml = fopen("TEST-test1.xml", "w");
1171  fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
1172 
1173  setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
1174  setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1);
1175 
1176  getopts(argc, argv);
1177 
1178  for (i = 0; i < options.iterations; ++i)
1179  {
1180  if (options.test_no == 0)
1181  { /* run all the tests */
1183  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
1184  }
1185  else
1186  rc = tests[options.test_no](options); /* run just the selected test */
1187  }
1188 
1189  if (rc == 0)
1190  MyLog(LOGA_INFO, "verdict pass");
1191  else
1192  MyLog(LOGA_INFO, "verdict fail");
1193 
1194  fprintf(xml, "</testsuite>\n");
1195  fclose(xml);
1196  return rc;
1197 }
#define assert1(a, b, c, d, e)
Definition: test1.c:245
char * test6_will_topic
Definition: test1.c:934
int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
Definition: MQTTClient.c:2772
struct Options options
void test2_sendAndReceive(MQTTClient *c, int qos, char *test_topic)
Definition: test1.c:466
enum MQTTPropertyCodes value
char ** haconnections
Definition: test1.c:50
int MQTTClient_receive(MQTTClient handle, char **topicName, int *topicLen, MQTTClient_message **message, unsigned long timeout)
Definition: MQTTClient.c:2674
void test6_connectionLost(void *context, char *cause)
Definition: test1.c:922
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
int test6(struct Options options)
Definition: test1.c:950
string topic
Definition: test2.py:8
int MQTTClient_isConnected(MQTTClient handle)
Definition: MQTTClient.c:1930
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions *options)
Definition: MQTTClient.c:1644
char * proxy_connection
Definition: test1.c:51
int MQTTClient_publish(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2387
volatile int test6_will_message_arrived
Definition: test1.c:919
char * connection
int MQTTClient_disconnect(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1908
int test4(struct Options options)
Definition: test1.c:826
START_TIME_TYPE start_clock(void)
Definition: test1.c:208
int test6_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *m)
Definition: test1.c:937
MQTTClient test6_c1
Definition: test1.c:918
struct pubsub_opts opts
Definition: paho_c_pub.c:42
int test6a(struct Options options)
Definition: test1.c:1057
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
#define malloc(x)
Definition: Heap.h:41
long elapsed(START_TIME_TYPE start_time)
Definition: test1.c:233
int test2(struct Options options)
Definition: test1.c:533
#define ARRAY_SIZE(a)
Definition: test1.c:39
int test4_run(int qos)
Definition: test1.c:681
#define MQTTClient_message_initializer
Definition: MQTTClient.h:327
static char msg_buf[512]
Definition: Log.c:122
#define MQTTClient_willOptions_initializer
Definition: MQTTClient.h:639
MQTTClient_message test2_pubmsg
Definition: test1.c:442
int main(int argc, char **argv)
Definition: test1.c:1164
#define LOGA_INFO
Definition: test1.c:145
int MQTTClient_setCallbacks(MQTTClient handle, void *context, MQTTClient_connectionLost *cl, MQTTClient_messageArrived *ma, MQTTClient_deliveryComplete *dc)
Definition: MQTTClient.c:1032
int failures
Definition: test1.c:248
constexpr size_t count()
Definition: core.h:960
int MQTTClient_publishMessage(MQTTClient handle, const char *topicName, MQTTClient_message *message, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2432
int hacount
Definition: test1.c:52
#define LOGA_DEBUG
Definition: test1.c:144
void getopts(int argc, char **argv)
Definition: test1.c:69
volatile int test6_connection_lost_called
Definition: test1.c:920
void test6_deliveryComplete(void *context, MQTTClient_deliveryToken token)
Definition: test1.c:929
void usage(void)
Definition: test1.c:41
int test2_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *m)
Definition: test1.c:449
description
Definition: setup.py:19
int MQTTClient_unsubscribe(MQTTClient handle, const char *topic)
Definition: MQTTClient.c:2239
struct MQTTClient_willOptions::@56 payload
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
Definition: test1.c:269
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
MQTTClient test6_c2
Definition: test1.c:918
int test2_deliveryCompleted
Definition: test1.c:441
int qos
Definition: test6.c:56
char * test6_will_message
Definition: test1.c:935
const char * topicName
Definition: MQTTClient.h:619
void MQTTClient_freeMessage(MQTTClient_message **message)
Definition: MQTTClient.c:601
void test1_sendAndReceive(MQTTClient *c, int qos, char *test_topic)
Definition: test1.c:296
void * MQTTClient
Definition: MQTTClient.h:246
void MQTTClient_yield(void)
Definition: MQTTClient.c:2730
void test2_deliveryComplete(void *context, MQTTClient_deliveryToken dt)
Definition: test1.c:444
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
FILE * xml
Definition: test1.c:249
int test3(struct Options options)
Definition: test1.c:609
volatile int test2_arrivedcount
Definition: test1.c:440
#define MQTTCLIENT_PERSISTENCE_DEFAULT
void MyLog(int LOGA_level, char *format,...)
Definition: test1.c:149
#define START_TIME_TYPE
Definition: test1.c:206
char * cur_output
Definition: test1.c:252
int MQTTClient_subscribe(MQTTClient handle, const char *topic, int qos)
Definition: MQTTClient.c:2104
char * test_topic
Definition: test11.c:307
MQTTClient c
Definition: test10.c:1656
dictionary context
Definition: test2.py:57
void MQTTClient_free(void *memory)
Definition: MQTTClient.c:612
null localtime_s(...)
Definition: chrono.h:286
#define MQTTCLIENT_PERSISTENCE_NONE
#define MQTTClient_connectOptions_initializer
Definition: MQTTClient.h:953
int test1(struct Options options)
Definition: test1.c:358
int test5(struct Options options)
Definition: test1.c:848
int MQTTClient_deliveryToken
Definition: MQTTClient.h:257
int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
Definition: MQTTClient.c:2814
char output[3000]
Definition: test1.c:251
char *const * serverURIs
Definition: MQTTClient.h:913
const char * message
Definition: MQTTClient.h:621
enum MQTTReasonCodes rc
Definition: test10.c:1112
int MQTTClient_create(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTClient.c:507
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
void write_test_result(void)
Definition: test1.c:255
MQTTClient_willOptions * will
Definition: MQTTClient.h:866
#define assert(a, b, c, d)
Definition: test1.c:244
START_TIME_TYPE global_start_time
Definition: test1.c:250
int test_no
Definition: test1.c:54
struct MQTTClient_connectOptions::@58 binarypwd
int tests
Definition: test1.c:247


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48