test10.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - MQTT 5.0 support
16  *******************************************************************************/
17 
18 
39 #include "MQTTClient.h"
40 #include <string.h>
41 #include <stdlib.h>
42 
43 #if !defined(_WINDOWS)
44  #include <sys/time.h>
45  #include <sys/socket.h>
46  #include <unistd.h>
47  #include <errno.h>
48 #else
49  #include <windows.h>
50  #define setenv(a, b, c) _putenv_s(a, b)
51 #endif
52 
53 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
54 
55 void usage(void)
56 {
57  printf("help!!\n");
58  exit(EXIT_FAILURE);
59 }
60 
61 struct Options
62 {
63  char* connection;
64  char** haconnections;
65  char* proxy_connection;
66  int hacount;
67  int verbose;
68  int test_no;
69  int MQTTVersion;
70  int iterations;
71 } options =
72 {
73  "tcp://localhost:1883",
74  NULL,
75  "tcp://localhost:1884",
76  0,
77  0,
78  0,
80  1,
81 };
82 
83 void getopts(int argc, char** argv)
84 {
85  int count = 1;
86 
87  while (count < argc)
88  {
89  if (strcmp(argv[count], "--test_no") == 0)
90  {
91  if (++count < argc)
92  options.test_no = atoi(argv[count]);
93  else
94  usage();
95  }
96  else if (strcmp(argv[count], "--connection") == 0)
97  {
98  if (++count < argc)
99  {
100  options.connection = argv[count];
101  printf("\nSetting connection to %s\n", options.connection);
102  }
103  else
104  usage();
105  }
106  else if (strcmp(argv[count], "--haconnections") == 0)
107  {
108  if (++count < argc)
109  {
110  char* tok = strtok(argv[count], " ");
111  options.hacount = 0;
112  options.haconnections = malloc(sizeof(char*) * 5);
113  while (tok)
114  {
115  options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
116  strcpy(options.haconnections[options.hacount], tok);
117  options.hacount++;
118  tok = strtok(NULL, " ");
119  }
120  }
121  else
122  usage();
123  }
124  else if (strcmp(argv[count], "--proxy_connection") == 0)
125  {
126  if (++count < argc)
128  else
129  usage();
130  }
131  else if (strcmp(argv[count], "--MQTTversion") == 0)
132  {
133  if (++count < argc)
134  {
135  options.MQTTVersion = atoi(argv[count]);
136  printf("setting MQTT version to %d\n", options.MQTTVersion);
137  }
138  else
139  usage();
140  }
141  else if (strcmp(argv[count], "--iterations") == 0)
142  {
143  if (++count < argc)
144  options.iterations = atoi(argv[count]);
145  else
146  usage();
147  }
148  else if (strcmp(argv[count], "--verbose") == 0)
149  {
150  options.verbose = 1;
151  printf("\nSetting verbose on\n");
152  }
153  count++;
154  }
155 }
156 
157 
158 #define LOGA_DEBUG 0
159 #define LOGA_INFO 1
160 #include <stdarg.h>
161 #include <time.h>
162 #include <sys/timeb.h>
163 void MyLog(int LOGA_level, char* format, ...)
164 {
165  static char msg_buf[256];
166  va_list args;
167 #if defined(_WIN32) || defined(_WINDOWS)
168  struct timeb ts;
169 #else
170  struct timeval ts;
171 #endif
172  struct tm timeinfo;
173 
174  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
175  return;
176 
177 #if defined(_WIN32) || defined(_WINDOWS)
178  ftime(&ts);
179  localtime_s(&timeinfo, &ts.time);
180 #else
181  gettimeofday(&ts, NULL);
182  localtime_r(&ts.tv_sec, &timeinfo);
183 #endif
184  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
185 
186 #if defined(_WIN32) || defined(_WINDOWS)
187  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
188 #else
189  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
190 #endif
191 
192  va_start(args, format);
193  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
194  va_end(args);
195 
196  printf("%s\n", msg_buf);
197  fflush(stdout);
198 }
199 
200 
201 #if defined(_WIN32) || defined(_WINDOWS)
202 #define mqsleep(A) Sleep(1000*A)
203 #define START_TIME_TYPE DWORD
204 static DWORD start_time = 0;
206 {
207  return GetTickCount();
208 }
209 #elif defined(AIX)
210 #define mqsleep sleep
211 #define START_TIME_TYPE struct timespec
213 {
214  static struct timespec start;
215  clock_gettime(CLOCK_REALTIME, &start);
216  return start;
217 }
218 #else
219 #define mqsleep sleep
220 #define START_TIME_TYPE struct timeval
221 /* TODO - unused - remove? static struct timeval start_time; */
223 {
224  struct timeval start_time;
225  gettimeofday(&start_time, NULL);
226  return start_time;
227 }
228 #endif
229 
230 
231 #if defined(_WIN32)
232 long elapsed(START_TIME_TYPE start_time)
233 {
234  return GetTickCount() - start_time;
235 }
236 #elif defined(AIX)
237 #define assert(a)
238 long elapsed(struct timespec start)
239 {
240  struct timespec now, res;
241 
242  clock_gettime(CLOCK_REALTIME, &now);
243  ntimersub(now, start, res);
244  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
245 }
246 #else
247 long elapsed(START_TIME_TYPE start_time)
248 {
249  struct timeval now, res;
250 
251  gettimeofday(&now, NULL);
252  timersub(&now, &start_time, &res);
253  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
254 }
255 #endif
256 
257 
258 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
259 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
260 
261 int tests = 0;
262 int failures = 0;
263 FILE* xml;
265 char output[3000];
267 
268 
270 {
271  long duration = elapsed(global_start_time);
272 
273  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
274  if (cur_output != output)
275  {
276  fprintf(xml, "%s", output);
277  cur_output = output;
278  }
279  fprintf(xml, "</testcase>\n");
280 }
281 
282 
283 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
284 {
285  ++tests;
286  if (!value)
287  {
288  va_list args;
289 
290  ++failures;
291  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
292 
293  va_start(args, format);
294  vprintf(format, args);
295  va_end(args);
296 
297  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
298  description, filename, lineno);
299  }
300  else
301  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
302 }
303 
304 
306 {
307  int i = 0;
308 
309  for (i = 0; i < props->count; ++i)
310  {
311  int id = props->array[i].identifier;
312  const char* name = MQTTPropertyName(id);
313  char* intformat = "Property name %s value %d";
314 
315  switch (MQTTProperty_getType(id))
316  {
318  MyLog(LOGA_INFO, intformat, name, props->array[i].value.byte);
319  break;
321  MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer2);
322  break;
324  MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
325  break;
327  MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
328  break;
331  MyLog(LOGA_INFO, "Property name value %s %.*s", name,
332  props->array[i].value.data.len, props->array[i].value.data.data);
333  break;
335  MyLog(LOGA_INFO, "Property name %s key %.*s value %.*s", name,
336  props->array[i].value.data.len, props->array[i].value.data.data,
337  props->array[i].value.value.len, props->array[i].value.value.data);
338  break;
339  }
340  }
341 }
342 
343 struct
344 {
347 {
348  0,
349 };
350 
352 {
353  MQTTClient c = (MQTTClient)context;
354  MyLog(LOGA_INFO, "Callback: disconnected, reason code \"%s\"", MQTTReasonCode_toString(rc));
355  logProperties(props);
356  test_topic_aliases_globals.disconnected = 1;
357 }
358 
359 static int messages_arrived = 0;
360 
361 int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
362 {
363  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
364  topicName, message->payloadlen, (char*)(message->payload));
365 
366  assert("Message structure version should be 1", message->struct_version == 1,
367  "message->struct_version was %d", message->struct_version);
368  if (message->struct_version == 1)
369  {
370  const int props_count = 0;
371 
372  assert("Properties count should be 0", message->properties.count == props_count,
373  "Properties count was %d\n", message->properties.count);
374  logProperties(&message->properties);
375  }
377 
378  MQTTClient_free(topicName);
379  MQTTClient_freeMessage(&message);
380  return 1;
381 }
382 
383 
385 {
386  int subsqos = 2;
387  MQTTClient c;
397  int rc = 0;
398  int count = 0;
399  char* test_topic = "test_client_topic_aliases";
400  int topicAliasMaximum = 0;
402 
403  fprintf(xml, "<testcase classname=\"test_client_topic_aliases\" name=\"client topic aliases\"");
405  failures = 0;
406  MyLog(LOGA_INFO, "Starting test 1 - client topic aliases");
407 
408  createOpts.MQTTVersion = MQTTVERSION_5;
409  rc = MQTTClient_createWithOptions(&c, options.connection, "client_topic_alias_test",
410  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
411  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
412  if (rc != MQTTCLIENT_SUCCESS)
413  {
414  MQTTClient_destroy(&c);
415  goto exit;
416  }
417 
418  rc = MQTTClient_setCallbacks(c, NULL, NULL, messageArrived, NULL);
419  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
420 
422  assert("Good rc from setDisconnected", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
423 
424  opts.keepAliveInterval = 20;
425  opts.cleanstart = 1;
426  opts.MQTTVersion = options.MQTTVersion;
427  if (options.haconnections != NULL)
428  {
429  opts.serverURIs = options.haconnections;
430  opts.serverURIcount = options.hacount;
431  }
432 
433  MyLog(LOGA_DEBUG, "Connecting");
434  response = MQTTClient_connect5(c, &opts, NULL, NULL);
435  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
436  if (response.reasonCode != MQTTCLIENT_SUCCESS)
437  goto exit;
438 
439  if (response.properties)
440  {
441  logProperties(response.properties);
442  MQTTResponse_free(response);
443  }
444 
445  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
446  pubmsg.payloadlen = 11;
447  pubmsg.qos = 1;
448  pubmsg.retained = 0;
449 
450  /* a Topic Alias of 0 is not allowed, so we should be disconnected */
451  property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
452  property.value.integer2 = 0;
453  MQTTProperties_add(&pubmsg.properties, &property);
454 
455  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
456  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
457 
458  /* Now we expect to receive a disconnect packet telling us why */
459  count = 0;
460  while (test_topic_aliases_globals.disconnected == 0 && ++count < 10)
461  {
462 #if defined(_WIN32)
463  Sleep(1000);
464 #else
465  usleep(1000000L);
466 #endif
467  }
468  assert("Disconnected should be called", test_topic_aliases_globals.disconnected == 1,
469  "was %d", test_topic_aliases_globals.disconnected);
470 
471  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
472  property.value.integer4 = 30;
473  MQTTProperties_add(&connect_props, &property);
474 
475  /* Now try a valid topic alias */
476  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
477  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
478  if (response.reasonCode != MQTTCLIENT_SUCCESS)
479  goto exit;
480 
481  if (response.properties)
482  {
485 
486  logProperties(response.properties);
487  MQTTResponse_free(response);
488  }
489  assert("topicAliasMaximum > 0", topicAliasMaximum > 0, "topicAliasMaximum was %d", topicAliasMaximum);
490 
491  /* subscribe to a topic */
492  response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
493  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
494 
495  /* then publish to the topic */
497  property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
498  property.value.integer2 = 1;
499  MQTTProperties_add(&pubmsg.properties, &property);
500 
501  messages_arrived = 0;
502  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
503  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
504 
505  /* should get a response */
506  while (messages_arrived == 0 && ++count < 10)
507  {
508 #if defined(_WIN32)
509  Sleep(1000);
510 #else
511  usleep(1000000L);
512 #endif
513  }
514  assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
515 
516  /* now publish to the topic alias only */
517  messages_arrived = 0;
518  response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt);
519  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
520 
521  /* should get a response */
522  while (messages_arrived == 0 && ++count < 10)
523  {
524 #if defined(_WIN32)
525  Sleep(1000);
526 #else
527  usleep(1000000L);
528 #endif
529  }
530  assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
531 
532  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
533 
534  /* Reconnect. Topic aliases should be deleted, but not subscription */
535  opts.cleanstart = 0;
536  response = MQTTClient_connect5(c, &opts, NULL, NULL);
537  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
538  MQTTResponse_free(response);
539  if (response.reasonCode != MQTTCLIENT_SUCCESS)
540  goto exit;
541 
542  /* then publish to the topic */
544  messages_arrived = 0;
545  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
546  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
547 
548  /* should get a response */
549  while (messages_arrived == 0 && ++count < 10)
550  {
551 #if defined(_WIN32)
552  Sleep(1000);
553 #else
554  usleep(1000000L);
555 #endif
556  }
557  assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
558 
559  /* now publish to the topic alias only */
560  test_topic_aliases_globals.disconnected = 0;
561  messages_arrived = 0;
562  property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
563  property.value.integer2 = 1;
564  MQTTProperties_add(&pubmsg.properties, &property);
565  response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt);
566  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
567 
568  /* should not get a response */
569  while (messages_arrived == 0 && ++count < 10)
570  {
571 #if defined(_WIN32)
572  Sleep(1000);
573 #else
574  usleep(1000000L);
575 #endif
576  }
577  assert("No message should have arrived", messages_arrived == 0, "was %d", messages_arrived);
578 
579  /* Now we expect to receive a disconnect packet telling us why */
580  count = 0;
581  while (test_topic_aliases_globals.disconnected == 0 && ++count < 10)
582  {
583 #if defined(_WIN32)
584  Sleep(1000);
585 #else
586  usleep(1000000L);
587 #endif
588  }
589  assert("Disconnected should be called", test_topic_aliases_globals.disconnected == 1,
590  "was %d", test_topic_aliases_globals.disconnected);
591 
593  MQTTProperties_free(&props);
594  MQTTProperties_free(&connect_props);
595  MQTTClient_destroy(&c);
596 
597 exit:
598  MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
599  (failures == 0) ? "passed" : "failed", tests, failures);
601  return failures;
602 }
603 
604 
605 
606 int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
607 {
608  static int received = 0;
609  static int first_topic_alias = 0;
610  int topicAlias = 0;
611 
612  received++;
613  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
614  topicName, message->payloadlen, (char*)(message->payload));
615 
616  assert("Message structure version should be 1", message->struct_version == 1,
617  "message->struct_version was %d", message->struct_version);
618  if (message->struct_version == 1)
619  {
620  const int props_count = 0;
621 
624 
625  if (received == 1)
626  first_topic_alias = topicAlias;
627  else
628  assert("All topic aliases should be the same", topicAlias == first_topic_alias,
629  "Topic alias was %d\n", topicAlias);
630 
631  assert("topicAlias should not be 0", topicAlias > 0, "Topic alias was %d\n", topicAlias);
632  logProperties(&message->properties);
633  }
635 
636  MQTTClient_free(topicName);
637  MQTTClient_freeMessage(&message);
638  return 1;
639 }
640 
641 
643 {
644  int subsqos = 2;
645  MQTTClient c;
653  int rc = 0;
654  int count = 0;
655  char* test_topic = "test_server_topic_aliases";
656  int topicAliasMaximum = 0;
657  int qos = 0;
658  const int msg_count = 3;
660 
661  fprintf(xml, "<testcase classname=\"test_server_topic_aliases\" name=\"server topic aliases\"");
663  failures = 0;
664  MyLog(LOGA_INFO, "Starting test 2 - server topic aliases");
665 
666  createOpts.MQTTVersion = MQTTVERSION_5;
667  rc = MQTTClient_createWithOptions(&c, options.connection, "server_topic_alias_test",
668  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
669  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
670  if (rc != MQTTCLIENT_SUCCESS)
671  {
672  MQTTClient_destroy(&c);
673  goto exit;
674  }
675 
676  rc = MQTTClient_setCallbacks(c, NULL, NULL, test2_messageArrived, NULL);
677  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
678 
679  opts.keepAliveInterval = 20;
680  opts.cleanstart = 1;
681  opts.MQTTVersion = options.MQTTVersion;
682  if (options.haconnections != NULL)
683  {
684  opts.serverURIs = options.haconnections;
685  opts.serverURIcount = options.hacount;
686  }
687 
688  /* Allow at least one server topic alias */
689  property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM;
690  property.value.integer2 = 1;
691  MQTTProperties_add(&connect_props, &property);
692 
693  MyLog(LOGA_DEBUG, "Connecting");
694  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
695  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
696  if (response.reasonCode != MQTTCLIENT_SUCCESS)
697  goto exit;
698 
699  if (response.properties)
700  {
703 
704  logProperties(response.properties);
705  MQTTResponse_free(response);
706  }
707 
708  /* subscribe to a topic */
709  response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
710  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
711 
712  messages_arrived = 0;
713  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
714  pubmsg.payloadlen = 11;
715  pubmsg.retained = 0;
716  for (qos = 0; qos < msg_count; ++qos)
717  {
718  pubmsg.qos = qos;
719  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
720  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
721  }
722 
723  /* should get responses */
724  while (messages_arrived < msg_count && ++count < 10)
725  {
726 #if defined(_WIN32)
727  Sleep(1000);
728 #else
729  usleep(1000000L);
730 #endif
731  }
732  assert("3 messages should have arrived", messages_arrived == msg_count, "was %d", messages_arrived);
733 
734  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
735 
737  MQTTProperties_free(&connect_props);
738  MQTTClient_destroy(&c);
739 
740 exit:
741  MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
742  (failures == 0) ? "passed" : "failed", tests, failures);
744  return failures;
745 }
746 
747 
748 
749 int test_subscription_ids_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
750 {
751  static int received = 0;
752  static int first_topic_alias = 0;
753  int topicAlias = 0;
754 
755  received++;
756  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
757  topicName, message->payloadlen, (char*)(message->payload));
758 
759  assert("Message structure version should be 1", message->struct_version == 1,
760  "message->struct_version was %d", message->struct_version);
761  if (message->struct_version == 1)
762  {
763  int subsidcount = 0, i = 0;
764 
766 
767  for (i = 0; i < subsidcount; ++i)
768  {
770  assert("Subsid is i+1", subsid == i+1, "subsid is not correct %d\n", subsid);
771  }
772  logProperties(&message->properties);
773  }
775 
776  MQTTClient_free(topicName);
777  MQTTClient_freeMessage(&message);
778  return 1;
779 }
780 
781 
783 {
784  int subsqos = 2;
785  MQTTClient c;
793  int rc = 0;
794  int count = 0;
795  char* test_topic = "test_subscription_ids";
796  const int msg_count = 1;
797  int subsids = 1;
799 
800  fprintf(xml, "<testcase classname=\"test_subscription_ids\" name=\"subscription ids\"");
802  failures = 0;
803  MyLog(LOGA_INFO, "Starting test 3 - subscription ids");
804 
805  createOpts.MQTTVersion = MQTTVERSION_5;
806  rc = MQTTClient_createWithOptions(&c, options.connection, "subscription_ids",
807  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
808  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
809  if (rc != MQTTCLIENT_SUCCESS)
810  {
811  MQTTClient_destroy(&c);
812  goto exit;
813  }
814 
816  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
817 
818  opts.keepAliveInterval = 20;
819  opts.cleanstart = 1;
820  opts.MQTTVersion = options.MQTTVersion;
821  if (options.haconnections != NULL)
822  {
823  opts.serverURIs = options.haconnections;
824  opts.serverURIcount = options.hacount;
825  }
826 
827  MyLog(LOGA_DEBUG, "Connecting");
828  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
829  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
830  if (response.reasonCode != MQTTCLIENT_SUCCESS)
831  goto exit;
832 
833  if (response.properties)
834  {
837 
838  logProperties(response.properties);
839  MQTTResponse_free(response);
840  }
841  assert("Subscription ids must be available", subsids == 1, "subsids is %d", subsids);
842 
843  /* subscribe to the test topic */
844  property.identifier = MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER;
845  property.value.integer4 = 1;
846  MQTTProperties_add(&subs_props, &property);
847  response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &subs_props);
848  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
849 
850  /* now to an overlapping topic */
851  property.value.integer4 = 2;
852  subs_props.array[0].value.integer4 = 2;
853  response = MQTTClient_subscribe5(c, "+", 2, NULL, &subs_props);
854  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
855 
856  messages_arrived = 0;
857  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
858  pubmsg.payloadlen = 11;
859  pubmsg.retained = 0;
860  pubmsg.qos = 2;
861 
862  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
863  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
864 
865  /* should get responses */
866  while (messages_arrived < msg_count && ++count < 10)
867  {
868 #if defined(_WIN32)
869  Sleep(1000);
870 #else
871  usleep(1000000L);
872 #endif
873  }
874  assert("1 message should have arrived", messages_arrived == msg_count, "was %d", messages_arrived);
875 
876  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
877 
879  MQTTProperties_free(&subs_props);
880  MQTTProperties_free(&connect_props);
881  MQTTClient_destroy(&c);
882 
883 exit:
884  MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
885  (failures == 0) ? "passed" : "failed", tests, failures);
887  return failures;
888 }
889 
890 
891 int test_flow_control_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
892 {
893  static int received = 0;
894  static int first_topic_alias = 0;
895  int topicAlias = 0;
896 
897  received++;
898  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
899  topicName, message->payloadlen, (char*)(message->payload));
900 
901  assert("Message structure version should be 1", message->struct_version == 1,
902  "message->struct_version was %d", message->struct_version);
904 
905  MQTTClient_free(topicName);
906  MQTTClient_freeMessage(&message);
907  return 1;
908 }
909 
910 static int blocking_found = 0;
911 
913 {
914  static char* msg = "Blocking publish on queue full";
915 
916  if (strstr(message, msg) != NULL)
917  blocking_found = 1;
918 }
919 
920 
922 {
923  int subsqos = 2;
924  MQTTClient c;
930  int rc = 0, i = 0, count = 0;
931  char* test_topic = "test_flow_control";
932  int receive_maximum = 65535;
934 
935  fprintf(xml, "<testcase classname=\"test_flow_control\" name=\"flow control\"");
937  failures = 0;
938  MyLog(LOGA_INFO, "Starting test - flow control");
939 
940  //MQTTClient_setTraceCallback(test_flow_control_trace_callback);
941 
942  createOpts.MQTTVersion = MQTTVERSION_5;
943  rc = MQTTClient_createWithOptions(&c, options.connection, "flow_control",
944  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
945  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
946  if (rc != MQTTCLIENT_SUCCESS)
947  goto exit;
948 
950  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
951 
952  opts.keepAliveInterval = 20;
953  opts.cleanstart = 1;
954  opts.MQTTVersion = options.MQTTVersion;
955  opts.reliable = 0;
956  opts.maxInflightMessages = 100;
957  if (options.haconnections != NULL)
958  {
959  opts.serverURIs = options.haconnections;
960  opts.serverURIcount = options.hacount;
961  }
962 
963  MyLog(LOGA_DEBUG, "Connecting");
964  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
965  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
966  if (response.reasonCode != MQTTCLIENT_SUCCESS)
967  goto exit;
968 
969  if (response.properties)
970  {
973 
974  logProperties(response.properties);
975  MQTTResponse_free(response);
976  }
977 
978  response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
979  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
980 
981  messages_arrived = 0;
982  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
983  pubmsg.payloadlen = 11;
984  pubmsg.retained = 0;
985  pubmsg.qos = 2;
986  for (i = 0; i < receive_maximum + 2; ++i)
987  {
988  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
989  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
990  }
991 
992  /* should get responses */
993  while (messages_arrived < receive_maximum + 2 && ++count < 10)
994  {
995 #if defined(_WIN32)
996  Sleep(1000);
997 #else
998  usleep(1000000L);
999 #endif
1000  }
1001  assert("messages should have arrived", messages_arrived == receive_maximum + 2, "was %d", messages_arrived);
1002  assert("should have blocked", blocking_found == 1, "was %d\n", blocking_found);
1003 
1004  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1005 
1006 exit:
1009  MQTTProperties_free(&connect_props);
1010  MQTTClient_destroy(&c);
1011 
1012  MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
1013  (failures == 0) ? "passed" : "failed", tests, failures);
1015  return failures;
1016 }
1017 
1018 
1020 {
1021  int subsqos = 2;
1022  MQTTClient c;
1027  int rc = 0, i = 0, count = 0;
1028  char* test_topic = "test_error_reporting";
1029  int receive_maximum = 65535;
1031 
1032  fprintf(xml, "<testcase classname=\"test_error_reporting\" name=\"error reporting\"");
1034  failures = 0;
1035  MyLog(LOGA_INFO, "Starting test - error reporting");
1036 
1037  //MQTTClient_setTraceCallback(test_flow_control_trace_callback);
1038 
1039  createOpts.MQTTVersion = MQTTVERSION_5;
1040  rc = MQTTClient_createWithOptions(&c, options.connection, "error_reporting",
1041  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1042  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1043  if (rc != MQTTCLIENT_SUCCESS)
1044  goto exit;
1045 
1046  rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
1047  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1048 
1049  opts.MQTTVersion = options.MQTTVersion;
1050  if (options.haconnections != NULL)
1051  {
1052  opts.serverURIs = options.haconnections;
1053  opts.serverURIcount = options.hacount;
1054  }
1055 
1056  MyLog(LOGA_DEBUG, "Connecting");
1057  response = MQTTClient_connect5(c, &opts, NULL, NULL);
1058  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1059  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1060  goto exit;
1061 
1062  if (response.properties)
1063  {
1066 
1067  logProperties(response.properties);
1068  MQTTResponse_free(response);
1069  }
1070 
1071  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
1072  property.value.data.data = "unsub user property";
1073  property.value.data.len = (int)strlen(property.value.data.data);
1074  property.value.value.data = "unsub user property value";
1075  property.value.value.len = (int)strlen(property.value.value.data);
1076  MQTTProperties_add(&props, &property);
1077  response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &props);
1078  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1079  assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
1080  if (response.properties)
1081  {
1082  logProperties(response.properties);
1083  MQTTResponse_free(response);
1084  }
1085 
1086  response = MQTTClient_unsubscribe5(c, test_topic, &props);
1087  assert("Good rc from unsubscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1088  assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
1089  if (response.properties)
1090  {
1091  logProperties(response.properties);
1092  MQTTResponse_free(response);
1093  }
1094 
1095  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1096 
1097 exit:
1099  MQTTProperties_free(&props);
1100  MQTTClient_destroy(&c);
1101 
1102  MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
1103  (failures == 0) ? "passed" : "failed", tests, failures);
1105  return failures;
1106 }
1107 
1108 struct
1109 {
1114 {
1115  0, -1, MQTTREASONCODE_SUCCESS
1116 };
1117 
1118 
1120 {
1121  MQTTClient c = (MQTTClient)context;
1122  MyLog(LOGA_INFO, "Callback: published, reason code \"%s\" msgid: %d packet type: %d",
1123  MQTTReasonCode_toString(rc), msgid, packet_type);
1126  if (props)
1127  {
1128  MyLog(LOGA_INFO, "Callback: published, properties:");
1129  logProperties(props);
1130  }
1131  test_qos_1_2_errors_globals.published = 1;
1132 }
1133 
1134 void test_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
1135 {
1136  printf("%s\n", message);
1137 }
1138 
1140 {
1144 };
1145 
1146 
1148 {
1149  int subsqos = 2;
1150  MQTTClient c;
1157  int rc = 0, i = 0, count = 0;
1158  char* test_topic = "test_qos_1_2_errors";
1159  int receive_maximum = 65535;
1161 
1162  fprintf(xml, "<testcase classname=\"test_qos_1_2_errors\" name=\"qos 1 2 errors\"");
1164  failures = 0;
1165  MyLog(LOGA_INFO, "Starting test - qos 1 and 2 errors");
1166 
1167  //MQTTClient_setTraceCallback(test_trace_callback);
1168 
1169  createOpts.MQTTVersion = MQTTVERSION_5;
1170  rc = MQTTClient_createWithOptions(&c, options.connection, "error_reporting",
1171  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1172  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1173  if (rc != MQTTCLIENT_SUCCESS)
1174  goto exit;
1175 
1176  rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
1177  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1178 
1179  rc = MQTTClient_setPublished(c, c, published);
1180  assert("Good rc from setPublished", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1181 
1182  opts.MQTTVersion = options.MQTTVersion;
1183  if (options.haconnections != NULL)
1184  {
1185  opts.serverURIs = options.haconnections;
1186  opts.serverURIcount = options.hacount;
1187  }
1188 
1189  MyLog(LOGA_DEBUG, "Connecting");
1190  response = MQTTClient_connect5(c, &opts, NULL, NULL);
1191  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1192  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1193  goto exit;
1194 
1195  if (response.properties)
1196  {
1199 
1200  logProperties(response.properties);
1201  MQTTResponse_free(response);
1202  }
1203 
1204  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1205  pubmsg.payloadlen = 11;
1206  pubmsg.qos = 1;
1207  pubmsg.retained = 0;
1208 
1209  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
1210  property.value.data.data = "unsub user property";
1211  property.value.data.len = (int)strlen(property.value.data.data);
1212  property.value.value.data = "unsub user property value";
1213  property.value.value.len = (int)strlen(property.value.value.data);
1214  MQTTProperties_add(&pubmsg.properties, &property);
1215 
1216  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
1217  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1218 
1219  count = 0;
1220  while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
1221  {
1222 #if defined(_WIN32)
1223  Sleep(1000);
1224 #else
1225  usleep(1000000L);
1226 #endif
1227  }
1228  assert("Published called", test_qos_1_2_errors_globals.published == 1,
1229  "published was %d", test_qos_1_2_errors_globals.published);
1230  assert("Reason code was packet identifier not found",
1232  "Reason code was %d", test_qos_1_2_errors_globals.rc);
1233  assert("Packet type was PUBACK", test_qos_1_2_errors_globals.packet_type == PUBACK,
1234  "packet type was %d", test_qos_1_2_errors_globals.packet_type);
1235 
1236  test_qos_1_2_errors_globals.published = 0;
1237  pubmsg.qos = 2;
1238  response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
1239  assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1240 
1241  count = 0;
1242  while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
1243  {
1244 #if defined(_WIN32)
1245  Sleep(1000);
1246 #else
1247  usleep(1000000L);
1248 #endif
1249  }
1250  assert("Published called", test_qos_1_2_errors_globals.published == 1,
1251  "published was %d", test_qos_1_2_errors_globals.published);
1252  assert("Reason code was packet identifier not found",
1254  "Reason code was %d", test_qos_1_2_errors_globals.rc);
1255  assert("Packet type was PUBREC", test_qos_1_2_errors_globals.packet_type == PUBREC,
1256  "packet type was %d", test_qos_1_2_errors_globals.packet_type);
1257 
1258  test_qos_1_2_errors_globals.published = 0;
1259  response = MQTTClient_publishMessage5(c, "test_qos_1_2_errors_pubcomp", &pubmsg, &dt);
1260  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1261 
1262  count = 0;
1263  while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
1264  {
1265 #if defined(_WIN32)
1266  Sleep(1000);
1267 #else
1268  usleep(1000000L);
1269 #endif
1270  }
1271  assert("Published called", test_qos_1_2_errors_globals.published == 1,
1272  "published was %d", test_qos_1_2_errors_globals.published);
1273  assert("Reason code was packet identifier not found",
1275  "Reason code was %d", test_qos_1_2_errors_globals.rc);
1276  assert("Packet type was PUBCOMP", test_qos_1_2_errors_globals.packet_type == PUBCOMP,
1277  "packet type was %d", test_qos_1_2_errors_globals.packet_type);
1278 
1279  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1280 
1281 exit:
1283  MQTTProperties_free(&props);
1284  MQTTClient_destroy(&c);
1285 
1286  MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
1287  (failures == 0) ? "passed" : "failed", tests, failures);
1289  return failures;
1290 }
1291 
1292 
1293 struct
1294 {
1297  int messages_arrived;
1300 {
1301  "my response topic",
1302  "my request topic",
1303  0,
1304  "request no 1",
1305 };
1306 
1307 
1308 int test_request_response_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
1309 {
1310  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
1311  topicName, message->payloadlen, (char*)(message->payload));
1312 
1313  assert("Message structure version should be 1", message->struct_version == 1,
1314  "message->struct_version was %d", message->struct_version);
1315  if (message->struct_version == 1)
1316  {
1317  const int props_count = 0;
1318 
1319  MyLog(LOGA_INFO, "Message properties:");
1320  logProperties(&message->properties);
1321  }
1322  test_request_response_globals.messages_arrived++;
1323 
1324  if (test_request_response_globals.messages_arrived == 1)
1325  {
1326  MQTTProperty *prop;
1327 
1328  assert("Topic should be request",
1329  strcmp(test_request_response_globals.request_topic, topicName) == 0,
1330  "topic was %s\n", topicName);
1331 
1334 
1335  assert("Topic should be response",
1336  strncmp(test_request_response_globals.response_topic, prop->value.data.data, prop->value.data.len) == 0,
1337  "topic was %.4s\n", prop->value.data.data);
1338 
1341 
1342  assert("Correlation data should be",
1343  strncmp(test_request_response_globals.correlation_id, prop->value.data.data, prop->value.data.len) == 0,
1344  "Correlation data was %.4s\n", prop->value.data.data);
1345 
1346  }
1347  else if (test_request_response_globals.messages_arrived == 2)
1348  {
1349  assert("Topic should be response",
1350  strcmp(test_request_response_globals.response_topic, topicName) == 0,
1351  "topic was %s\n", topicName);
1352  }
1353 
1354  MQTTClient_free(topicName);
1355  MQTTClient_freeMessage(&message);
1356  return 1;
1357 }
1358 
1359 
1361 {
1362  int subsqos = 2;
1363  MQTTClient c;
1371  int rc = 0;
1372  int count = 0;
1373  char* test_topic = "test_request_response";
1374  const int msg_count = 1;
1375  int subsids = 1;
1377 
1378  fprintf(xml, "<testcase classname=\"test_request_response\" name=\"request/response\"");
1380  failures = 0;
1381  MyLog(LOGA_INFO, "Starting test 7 - request response");
1382 
1383  createOpts.MQTTVersion = MQTTVERSION_5;
1384  rc = MQTTClient_createWithOptions(&c, options.connection, "request_response",
1385  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1386  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1387  if (rc != MQTTCLIENT_SUCCESS)
1388  {
1389  MQTTClient_destroy(&c);
1390  goto exit;
1391  }
1392 
1394  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1395 
1396  opts.keepAliveInterval = 20;
1397  opts.cleanstart = 1;
1398  opts.MQTTVersion = options.MQTTVersion;
1399  if (options.haconnections != NULL)
1400  {
1401  opts.serverURIs = options.haconnections;
1402  opts.serverURIcount = options.hacount;
1403  }
1404 
1405  MyLog(LOGA_DEBUG, "Connecting");
1406  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
1407  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1408  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1409  goto exit;
1410 
1411  if (response.properties)
1412  {
1415 
1416  MyLog(LOGA_INFO, "Connack properties:");
1417  logProperties(response.properties);
1418  MQTTResponse_free(response);
1419  }
1420 
1421  response = MQTTClient_subscribe5(c, test_request_response_globals.response_topic, 2, NULL, NULL);
1422  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1423 
1424  response = MQTTClient_subscribe5(c, test_request_response_globals.request_topic, 2, NULL, NULL);
1425  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1426 
1427  messages_arrived = 0;
1428  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1429  pubmsg.payloadlen = 11;
1430  pubmsg.retained = 0;
1431  pubmsg.qos = 2;
1432 
1433  property.identifier = MQTTPROPERTY_CODE_RESPONSE_TOPIC;
1434  property.value.data.data = test_request_response_globals.response_topic;
1435  property.value.data.len = (int)strlen(property.value.data.data);
1436  MQTTProperties_add(&pubmsg.properties, &property);
1437 
1438  property.identifier = MQTTPROPERTY_CODE_CORRELATION_DATA;
1439  property.value.data.data = test_request_response_globals.correlation_id;
1440  property.value.data.len = (int)strlen(property.value.data.data);
1441  MQTTProperties_add(&pubmsg.properties, &property);
1442 
1443  response = MQTTClient_publishMessage5(c, test_request_response_globals.request_topic, &pubmsg, &dt);
1444  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1445 
1446  /* should get the request */
1447  while (test_request_response_globals.messages_arrived < 1 && ++count < 10)
1448  {
1449 #if defined(_WIN32)
1450  Sleep(1000);
1451 #else
1452  usleep(1000000L);
1453 #endif
1454  }
1455  assert("1 message should have arrived", test_request_response_globals.messages_arrived == 1, "was %d",
1456  test_request_response_globals.messages_arrived);
1457 
1459  property.identifier = MQTTPROPERTY_CODE_CORRELATION_DATA;
1460  property.value.data.data = "request no 1";
1461  property.value.data.len = (int)strlen(property.value.data.data);
1462  MQTTProperties_add(&pubmsg.properties, &property);
1463 
1464  response = MQTTClient_publishMessage5(c, test_request_response_globals.response_topic, &pubmsg, &dt);
1465  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1466 
1467  /* should get the response */
1468  while (test_request_response_globals.messages_arrived < 1 && ++count < 10)
1469  {
1470 #if defined(_WIN32)
1471  Sleep(1000);
1472 #else
1473  usleep(1000000L);
1474 #endif
1475  }
1476  assert("1 message should have arrived", test_request_response_globals.messages_arrived == 1, "was %d",
1477  test_request_response_globals.messages_arrived);
1478 
1479  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1480 
1482  MQTTProperties_free(&subs_props);
1483  MQTTProperties_free(&connect_props);
1484  MQTTClient_destroy(&c);
1485 
1486 exit:
1487  MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
1488  (failures == 0) ? "passed" : "failed", tests, failures);
1490  return failures;
1491 }
1492 
1493 
1494 struct
1495 {
1496  char* topic;
1497  int messages_arrived;
1499 {
1500  "my response topic",
1501  0,
1502 };
1503 
1504 
1505 int test_subscribe_options_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
1506 {
1507  int subsidcount = 0, i = 0, subsid = -1;
1508 
1509  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
1510  topicName, message->payloadlen, (char*)(message->payload));
1511 
1512  assert("Message structure version should be 1", message->struct_version == 1,
1513  "message->struct_version was %d", message->struct_version);
1514  if (message->struct_version == 1)
1515  {
1516  const int props_count = 0;
1517 
1518  MyLog(LOGA_INFO, "Message properties:");
1519  logProperties(&message->properties);
1520  }
1521  test_subscribe_options_globals.messages_arrived++;
1522 
1523  if (test_subscribe_options_globals.messages_arrived == 1)
1524  {
1526  assert("Subsidcount is i", subsidcount == 1, "subsidcount is not correct %d\n", subsidcount);
1527 
1529  assert("Subsid is 2", subsid == 2, "subsid is not correct %d\n", subsid);
1530  }
1531 
1532  MQTTClient_free(topicName);
1533  MQTTClient_freeMessage(&message);
1534  return 1;
1535 }
1536 
1537 
1539 {
1540  int subsqos = 2;
1541  MQTTClient c;
1549  int rc = 0;
1550  int count = 0;
1551  const int msg_count = 1;
1554 
1555  fprintf(xml, "<testcase classname=\"test_subscribe_options\" name=\"subscribe options\"");
1557  failures = 0;
1558  MyLog(LOGA_INFO, "Starting test 8 - subscribe options");
1559 
1560  createOpts.MQTTVersion = MQTTVERSION_5;
1561  rc = MQTTClient_createWithOptions(&c, options.connection, "subscribe_options",
1562  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1563  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1564  if (rc != MQTTCLIENT_SUCCESS)
1565  {
1566  MQTTClient_destroy(&c);
1567  goto exit;
1568  }
1569 
1571  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1572 
1573  opts.keepAliveInterval = 20;
1574  opts.cleanstart = 1;
1575  opts.MQTTVersion = options.MQTTVersion;
1576  if (options.haconnections != NULL)
1577  {
1578  opts.serverURIs = options.haconnections;
1579  opts.serverURIcount = options.hacount;
1580  }
1581 
1582  MyLog(LOGA_DEBUG, "Connecting");
1583  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
1584  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1585  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1586  goto exit;
1587 
1588  if (response.properties)
1589  {
1590  MyLog(LOGA_INFO, "Connack properties:");
1591  logProperties(response.properties);
1592  MQTTResponse_free(response);
1593  }
1594 
1595  property.identifier = MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER;
1596  property.value.integer4 = 1;
1597  MQTTProperties_add(&subs_props, &property);
1598  subopts.noLocal = 1;
1599  response = MQTTClient_subscribe5(c, test_subscribe_options_globals.topic, 2, &subopts, &subs_props);
1600  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1601 
1602  subs_props.array[0].value.integer4 = 2;
1603  subopts.noLocal = 0;
1604  subopts.retainHandling = 1;
1605  response = MQTTClient_subscribe5(c, "#", 2, &subopts, &subs_props);
1606  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1607 
1608  messages_arrived = 0;
1609  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1610  pubmsg.payloadlen = 11;
1611  pubmsg.retained = 0;
1612  pubmsg.qos = 2;
1613 
1614  response = MQTTClient_publishMessage5(c, test_subscribe_options_globals.topic, &pubmsg, &dt);
1615  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1616 
1617  /* should get the request */
1618  while (test_subscribe_options_globals.messages_arrived < 1 && ++count < 10)
1619  {
1620 #if defined(_WIN32)
1621  Sleep(1000);
1622 #else
1623  usleep(1000000L);
1624 #endif
1625  }
1626  assert("1 message should have arrived", test_subscribe_options_globals.messages_arrived == 1, "was %d",
1627  test_subscribe_options_globals.messages_arrived);
1628 
1629  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1630 
1632  MQTTProperties_free(&subs_props);
1633  MQTTProperties_free(&connect_props);
1634  MQTTClient_destroy(&c);
1635 
1636 exit:
1637  MyLog(LOGA_INFO, "TEST8: test %s. %d tests run, %d failures.",
1638  (failures == 0) ? "passed" : "failed", tests, failures);
1640  return failures;
1641 }
1642 
1643 
1644 struct
1645 {
1647  char* topic;
1648  int messages_arrived;
1650 {
1651  "$share/share_test/#",
1652  "a",
1653  0,
1654 };
1655 
1657 
1658 int test_shared_subscriptions_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
1659 {
1660  int subsidcount = 0, i = 0, subsid = -1;
1661 
1662  MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
1663  topicName, message->payloadlen, (char*)(message->payload));
1664 
1665  assert("Message structure version should be 1", message->struct_version == 1,
1666  "message->struct_version was %d", message->struct_version);
1667  if (message->struct_version == 1)
1668  {
1669  const int props_count = 0;
1670 
1671  if (message->properties.count > 0)
1672  {
1673  MyLog(LOGA_INFO, "Message properties:");
1674  logProperties(&message->properties);
1675  }
1676  }
1677  test_shared_subscriptions_globals.messages_arrived++;
1678 
1679  MQTTClient_free(topicName);
1680  MQTTClient_freeMessage(&message);
1681  return 1;
1682 }
1683 
1684 
1686 {
1687  int subsqos = 2;
1694  int rc = 0;
1695  int count = 0;
1696  const int msg_count = 1;
1698  int i;
1700 
1701  fprintf(xml, "<testcase classname=\"test_shared_subscriptions\" name=\"shared subscriptions\"");
1703  failures = 0;
1704  MyLog(LOGA_INFO, "Starting test 8 - shared subscriptions");
1705 
1706  createOpts.MQTTVersion = MQTTVERSION_5;
1707  rc = MQTTClient_createWithOptions(&c, options.connection, "shared_subscriptions",
1708  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1709  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1710  if (rc != MQTTCLIENT_SUCCESS)
1711  {
1713  goto exit;
1714  }
1715 
1716  rc = MQTTClient_createWithOptions(&d, options.connection, "shared_subscriptions_1",
1717  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
1718  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1719  if (rc != MQTTCLIENT_SUCCESS)
1720  {
1722  goto exit;
1723  }
1724 
1726  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1727 
1729  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1730 
1731  opts.keepAliveInterval = 20;
1732  opts.cleanstart = 1;
1733  opts.MQTTVersion = options.MQTTVersion;
1734  if (options.haconnections != NULL)
1735  {
1736  opts.serverURIs = options.haconnections;
1737  opts.serverURIcount = options.hacount;
1738  }
1739 
1740  MyLog(LOGA_DEBUG, "Connecting");
1741  response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
1742  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1743  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1744  goto exit;
1745 
1746  if (response.properties)
1747  {
1748  MyLog(LOGA_INFO, "Connack properties:");
1749  logProperties(response.properties);
1750  MQTTResponse_free(response);
1751  }
1752 
1753  MyLog(LOGA_DEBUG, "Connecting");
1754  response = MQTTClient_connect5(d, &opts, &connect_props, NULL);
1755  assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
1756  if (response.reasonCode != MQTTCLIENT_SUCCESS)
1757  goto exit;
1758 
1759  if (response.properties)
1760  {
1761  MyLog(LOGA_INFO, "Connack properties:");
1762  logProperties(response.properties);
1763  MQTTResponse_free(response);
1764  }
1765 
1766  response = MQTTClient_subscribe5(c, test_shared_subscriptions_globals.shared_topic, 2, &subopts, &subs_props);
1767  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1768 
1769  response = MQTTClient_subscribe5(d, test_shared_subscriptions_globals.shared_topic, 2, &subopts, &subs_props);
1770  assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
1771 
1772  messages_arrived = 0;
1773  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
1774  pubmsg.payloadlen = 11;
1775  pubmsg.retained = 0;
1776  pubmsg.qos = 2;
1777 
1778  test_shared_subscriptions_globals.messages_arrived = 0;
1779  for (i = 0; i < 10; ++i)
1780  {
1781  response = MQTTClient_publishMessage5(c, test_shared_subscriptions_globals.topic, &pubmsg, &dt);
1782  assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
1783 
1784  /* should get the request */
1785  while (test_shared_subscriptions_globals.messages_arrived < i+1 && ++count < 100)
1786  {
1787 #if defined(_WIN32)
1788  Sleep(100);
1789 #else
1790  usleep(100000L);
1791 #endif
1792  }
1793  assert("1 message should have arrived", test_shared_subscriptions_globals.messages_arrived == i+1, "was %d",
1794  test_shared_subscriptions_globals.messages_arrived);
1795  }
1796 
1797  rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
1798 
1800  MQTTProperties_free(&subs_props);
1801  MQTTProperties_free(&connect_props);
1803 
1804 exit:
1805  MyLog(LOGA_INFO, "TEST9: test %s. %d tests run, %d failures.",
1806  (failures == 0) ? "passed" : "failed", tests, failures);
1808  return failures;
1809 }
1810 
1811 
1812 int main(int argc, char** argv)
1813 {
1814  int rc = 0,
1815  i;
1816  int (*tests[])() = {NULL,
1826  };
1827 
1828  xml = fopen("TEST-test1.xml", "w");
1829  fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
1830 
1832 
1833  getopts(argc, argv);
1834 
1835  for (i = 0; i < options.iterations; ++i)
1836  {
1837  if (options.test_no == 0)
1838  { /* run all the tests */
1840  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
1841  }
1842  else
1843  rc = tests[options.test_no](options); /* run just the selected test */
1844  }
1845 
1846  if (rc == 0)
1847  MyLog(LOGA_INFO, "verdict pass");
1848  else
1849  MyLog(LOGA_INFO, "verdict fail");
1850 
1851  fprintf(xml, "</testsuite>\n");
1852  fclose(xml);
1853  return rc;
1854 }
int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTClient.c:1919
struct Options options
enum MQTTPropertyCodes value
char ** haconnections
Definition: test1.c:50
char * cur_output
Definition: test10.c:266
char * request_topic
Definition: test10.c:1296
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
MQTTReasonCodes
int test_subscribe_options_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:1505
FILE * xml
Definition: test10.c:263
MQTTProperties props
Definition: paho_c_pub.c:54
char * proxy_connection
Definition: test1.c:51
int test_server_topic_aliases(struct Options options)
Definition: test10.c:642
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
Definition: test10.c:283
MQTTLenString value
char * connection
void MQTTProperties_free(MQTTProperties *props)
MQTTClient d
Definition: test10.c:1656
int published
Definition: test10.c:1110
const char * MQTTReasonCode_toString(enum MQTTReasonCodes value)
struct pubsub_opts opts
Definition: paho_c_pub.c:42
int MQTTProperties_add(MQTTProperties *props, const MQTTProperty *prop)
int main(int argc, char **argv)
Definition: test10.c:1812
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
int MQTTProperty_getType(enum MQTTPropertyCodes value)
#define malloc(x)
Definition: Heap.h:41
int test_subscribe_options(struct Options options)
Definition: test10.c:1538
#define MQTTClient_connectOptions_initializer5
Definition: MQTTClient.h:956
int MQTTClient_createWithOptions(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTClient_createOptions *options)
Definition: MQTTClient.c:364
long elapsed(START_TIME_TYPE start_time)
Definition: test10.c:247
MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char *topic, MQTTProperties *props)
Definition: MQTTClient.c:2230
MQTTProperty * MQTTProperties_getProperty(MQTTProperties *props, enum MQTTPropertyCodes propid)
char output[3000]
Definition: test10.c:265
struct @79 test_shared_subscriptions_globals
int receive_maximum
Definition: test11.c:920
int MQTTProperties_getNumericValue(MQTTProperties *props, enum MQTTPropertyCodes propid)
int test_subscription_ids_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:749
#define MQTTClient_message_initializer
Definition: MQTTClient.h:327
enum MQTTReasonCodes reasonCode
Definition: MQTTClient.h:991
static char msg_buf[512]
Definition: Log.c:122
void MQTTResponse_free(MQTTResponse response)
Definition: MQTTClient.c:620
MQTTCLIENT_TRACE_LEVELS
Definition: MQTTClient.h:1377
int test_error_reporting(struct Options options)
Definition: test10.c:1019
char * response_topic
Definition: test10.c:1295
#define MQTTClient_willOptions_initializer
Definition: MQTTClient.h:639
int MQTTClient_setDisconnected(MQTTClient handle, void *context, MQTTClient_disconnected *disconnected)
Definition: MQTTClient.c:682
#define LOGA_INFO
Definition: test10.c:159
struct @76 test_qos_1_2_errors_globals
int MQTTClient_setCallbacks(MQTTClient handle, void *context, MQTTClient_connectionLost *cl, MQTTClient_messageArrived *ma, MQTTClient_deliveryComplete *dc)
Definition: MQTTClient.c:1032
void MQTTClient_setTraceCallback(MQTTClient_traceCallback *callback)
Definition: MQTTClient.c:2861
char * topic
Definition: test10.c:1496
constexpr size_t count()
Definition: core.h:960
int test_client_topic_aliases(struct Options options)
Definition: test10.c:384
int test_flow_control_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:891
struct @77 test_request_response_globals
int hacount
Definition: test1.c:52
#define LOGA_DEBUG
Definition: test10.c:158
char * correlation_id
Definition: test10.c:1298
const char * MQTTPropertyName(enum MQTTPropertyCodes value)
MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char *topicName, MQTTClient_message *message, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2401
description
Definition: setup.py:19
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:361
int packet_type
Definition: test10.c:1111
void test_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char *message)
Definition: test10.c:1134
enum MQTTPropertyCodes identifier
msgTypes
Definition: MQTTPacket.h:46
int test2_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:606
int disconnected
Definition: test10.c:345
int qos
Definition: test6.c:56
int test_shared_subscriptions_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:1658
void MyLog(int LOGA_level, char *format,...)
Definition: test10.c:163
int test_shared_subscriptions(struct Options options)
Definition: test10.c:1685
int tests
Definition: test10.c:261
#define ARRAY_SIZE(a)
Definition: test10.c:53
START_TIME_TYPE global_start_time
Definition: test10.c:264
void MQTTClient_freeMessage(MQTTClient_message **message)
Definition: MQTTClient.c:601
struct @75 test_topic_aliases_globals
void * MQTTClient
Definition: MQTTClient.h:246
START_TIME_TYPE start_clock(void)
Definition: test10.c:222
int MQTTProperties_propertyCount(MQTTProperties *props, enum MQTTPropertyCodes propid)
int msg_count
Definition: test11.c:559
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions *options, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1658
struct @78 test_subscribe_options_globals
#define MQTTCLIENT_PERSISTENCE_DEFAULT
MQTTProperties properties
Definition: MQTTClient.h:324
const char * name
MQTTProperty * array
int MQTTProperties_hasProperty(MQTTProperties *props, enum MQTTPropertyCodes propid)
int test_request_response(struct Options options)
Definition: test10.c:1360
void getopts(int argc, char **argv)
Definition: test10.c:83
int MQTTClient_setPublished(MQTTClient handle, void *context, MQTTClient_published *published)
Definition: MQTTClient.c:723
int test_request_response_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: test10.c:1308
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
int test_flow_control(struct Options options)
Definition: test10.c:921
char * test_topic
Definition: test11.c:307
MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char *topic, int qos, MQTTSubscribe_options *opts, MQTTProperties *props)
Definition: MQTTClient.c:2090
MQTTClient c
Definition: test10.c:1656
void write_test_result(void)
Definition: test10.c:269
dictionary context
Definition: test2.py:57
#define START_TIME_TYPE
Definition: test10.c:220
int failures
Definition: test10.c:262
static int messages_arrived
Definition: test10.c:359
void MQTTClient_free(void *memory)
Definition: MQTTClient.c:612
null localtime_s(...)
Definition: chrono.h:286
#define MQTTResponse_initializer
Definition: MQTTClient.h:997
void usage(void)
Definition: test10.c:55
int MQTTClient_deliveryToken
Definition: MQTTClient.h:257
#define MQTTClient_createOptions_initializer
Definition: MQTTClient.h:549
int test_subscription_ids(struct Options options)
Definition: test10.c:782
int MQTTProperties_getNumericValueAt(MQTTProperties *props, enum MQTTPropertyCodes propid, int index)
char * shared_topic
Definition: test10.c:1646
static int blocking_found
Definition: test10.c:910
unsigned char retainHandling
#define MQTTSubscribe_options_initializer
char *const * serverURIs
Definition: MQTTClient.h:913
void test_flow_control_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char *message)
Definition: test10.c:912
#define assert(a, b, c, d)
Definition: test10.c:258
MQTTProperty property
Definition: paho_c_pub.c:53
enum MQTTReasonCodes rc
Definition: test10.c:1112
void logProperties(MQTTProperties *props)
Definition: test10.c:305
MQTTProperties * properties
Definition: MQTTClient.h:994
Definition: test10.c:1143
int test_no
Definition: test1.c:54
#define MQTTProperties_initializer
int test_qos_1_2_errors(struct Options options)
Definition: test10.c:1147


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48