test95.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - correct some compile warnings
16  * Ian Craggs - add binary will message test
17  * Ian Craggs - MQTT V5 updates
18  *******************************************************************************/
19 
20 
28 #include "MQTTAsync.h"
29 #include <string.h>
30 #include <stdlib.h>
31 #include "Thread.h"
32 
33 #if !defined(_WINDOWS)
34  #include <sys/time.h>
35  #include <sys/socket.h>
36  #include <unistd.h>
37  #include <errno.h>
38 #else
39  #include <windows.h>
40 #endif
41 
42 char unique[50]; // unique suffix/prefix to add to clientid/topic etc
43 
44 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
45 
46 void usage(void)
47 {
48  printf("help!!\n");
49  exit(EXIT_FAILURE);
50 }
51 
52 struct Options
53 {
54  char* connection;
55  char* proxy_connection;
56  int verbose;
57  int test_no;
58 } options =
59 {
60  "mqtt.eclipse.org:1883",
61  "localhost:1883",
62  0,
63  0,
64 };
65 
66 void getopts(int argc, char** argv)
67 {
68  int count = 1;
69 
70  while (count < argc)
71  {
72  if (strcmp(argv[count], "--test_no") == 0)
73  {
74  if (++count < argc)
75  options.test_no = atoi(argv[count]);
76  else
77  usage();
78  }
79  else if (strcmp(argv[count], "--connection") == 0)
80  {
81  if (++count < argc)
82  options.connection = argv[count];
83  else
84  usage();
85  }
86  else if (strcmp(argv[count], "--proxy_connection") == 0)
87  {
88  if (++count < argc)
90  else
91  usage();
92  }
93  else if (strcmp(argv[count], "--verbose") == 0)
94  options.verbose = 1;
95  count++;
96  }
97 }
98 
99 
100 #define LOGA_DEBUG 0
101 #define LOGA_INFO 1
102 #include <stdarg.h>
103 #include <time.h>
104 #include <sys/timeb.h>
105 void MyLog(int LOGA_level, char* format, ...)
106 {
107  static char msg_buf[256];
108  va_list args;
109 #if defined(_WIN32) || defined(_WINDOWS)
110  struct timeb ts;
111 #else
112  struct timeval ts;
113 #endif
114  struct tm timeinfo;
115 
116  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
117  return;
118 
119 #if defined(_WIN32) || defined(_WINDOWS)
120  ftime(&ts);
121  localtime_s(&timeinfo, &ts.time);
122 #else
123  gettimeofday(&ts, NULL);
124  localtime_r(&ts.tv_sec, &timeinfo);
125 #endif
126  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
127 
128 #if defined(_WIN32) || defined(_WINDOWS)
129  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
130 #else
131  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
132 #endif
133 
134  va_start(args, format);
135  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
136  va_end(args);
137 
138  printf("%s\n", msg_buf);
139  fflush(stdout);
140 }
141 
142 void MySleep(long milliseconds)
143 {
144 #if defined(_WIN32) || defined(_WIN64)
145  Sleep(milliseconds);
146 #else
147  usleep(milliseconds*1000);
148 #endif
149 }
150 
151 #if defined(_WIN32) || defined(_WINDOWS)
152 #define START_TIME_TYPE DWORD
153 static DWORD start_time = 0;
155 {
156  return GetTickCount();
157 }
158 #elif defined(AIX)
159 #define START_TIME_TYPE struct timespec
161 {
162  static struct timespec start;
163  clock_gettime(CLOCK_REALTIME, &start);
164  return start;
165 }
166 #else
167 #define START_TIME_TYPE struct timeval
168 /* TODO - unused - remove? static struct timeval start_time; */
170 {
171  struct timeval start_time;
172  gettimeofday(&start_time, NULL);
173  return start_time;
174 }
175 #endif
176 
177 #if defined(_WIN32)
178 long elapsed(START_TIME_TYPE start_time)
179 {
180  return GetTickCount() - start_time;
181 }
182 #elif defined(AIX)
183 #define assert(a)
184 long elapsed(struct timespec start)
185 {
186  struct timespec now, res;
187 
188  clock_gettime(CLOCK_REALTIME, &now);
189  ntimersub(now, start, res);
190  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
191 }
192 #else
193 long elapsed(START_TIME_TYPE start_time)
194 {
195  struct timeval now, res;
196 
197  gettimeofday(&now, NULL);
198  timersub(&now, &start_time, &res);
199  return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
200 }
201 #endif
202 
203 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
204 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
205 
206 #define MAXMSGS 30;
207 
208 int tests = 0;
209 int failures = 0;
210 FILE* xml;
212 char output[3000];
214 
215 
217 {
218  long duration = elapsed(global_start_time);
219 
220  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
221  if (cur_output != output)
222  {
223  fprintf(xml, "%s", output);
224  cur_output = output;
225  }
226  fprintf(xml, "</testcase>\n");
227 }
228 
229 void myassert(char* filename, int lineno, char* description, int value,
230  char* format, ...)
231 {
232  ++tests;
233  if (!value)
234  {
235  va_list args;
236 
237  ++failures;
238  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
239  lineno, description);
240 
241  va_start(args, format);
242  vprintf(format, args);
243  va_end(args);
244 
245  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
246  description, filename, lineno);
247  }
248  else
249  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
250  filename, lineno, description);
251 }
252 
253 
255 {
256  int i = 0;
257 
258  for (i = 0; i < props->count; ++i)
259  {
260  int id = props->array[i].identifier;
261  const char* name = MQTTPropertyName(id);
262  char* intformat = "Property name %s value %d";
263 
264  switch (MQTTProperty_getType(id))
265  {
267  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.byte);
268  break;
270  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer2);
271  break;
273  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
274  break;
276  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
277  break;
280  MyLog(LOGA_DEBUG, "Property name %s value len %.*s", name,
281  props->array[i].value.data.len, props->array[i].value.data.data);
282  break;
284  MyLog(LOGA_DEBUG, "Property name %s key %.*s value %.*s", name,
285  props->array[i].value.data.len, props->array[i].value.data.data,
286  props->array[i].value.value.len, props->array[i].value.value.data);
287  break;
288  }
289  }
290 }
291 
292 
294 {
295  int i = 0, rc = 0, count = 0;
296  MQTTAsync_token *tokens;
297 
298  /* acks for outgoing messages could arrive after incoming exchanges are complete */
299  do
300  {
301  rc = MQTTAsync_getPendingTokens(c, &tokens);
302  assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
303  i = 0;
304  if (tokens)
305  {
306  while (tokens[i] != -1)
307  ++i;
308  MQTTAsync_free(tokens);
309  }
310  if (i > 0)
311  MySleep(100);
312  }
313  while (i > 0 && ++count < 10);
314  assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
315 }
316 
317 
319 {
320  int i = 0, rc = 0;
321  MQTTAsync_token *tokens;
322 
323  rc = MQTTAsync_getPendingTokens(c, &tokens);
324  assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
325  i = 0;
326  if (tokens)
327  {
328  while (tokens[i] != -1)
329  ++i;
330  MQTTAsync_free(tokens);
331  }
332  assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
333 }
334 
335 
336 /*********************************************************************
337 
338  Tests: offline buffering - sending messages while disconnected
339 
340  1. send some messages while disconnected, check that they are sent
341  2. repeat test 1 using serverURIs
342  3. repeat test 1 using auto reconnect
343  4. repeat test 2 using auto reconnect
344  5. check max-buffered
345  6. check auto-reconnect parms alter behaviour as expected
346 
347  Tests: automatic reconnect
348 
349  - check that connected() is called
350  - check that reconnect() causes reconnect attempt
351  - check that reconnect() fails if no connect has been previously attempted
352 
353  *********************************************************************/
354 
355 
356 
357 
358 /*********************************************************************
359 
360  Test1: offline buffering - sending messages while disconnected
361 
362  1. call connect
363  2. use proxy to disconnect the client
364  3. while the client is disconnected, send more messages
365  4. when the client reconnects, check that those messages are sent
366 
367  *********************************************************************/
368 
371 
372 int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
373 {
374  MQTTAsync c = (MQTTAsync)context;
375  static int message_count = 0;
376  static int first = 1;
377 
378  if (first == 1)
379  {
380  first = 0;
381  return 0; /* to force queue persistence */
382  }
383 
384  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
385 
386  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
388  else
390 
391  if (message->struct_version == 1)
392  {
393  assert("Properties count should be > 0", message->properties.count > 0,
394  "Properties count was %d\n", message->properties.count);
395  logProperties(&message->properties);
396  }
397 
398  MQTTAsync_freeMessage(&message);
399  MQTTAsync_free(topicName);
400 
401  return 1;
402 }
403 
405 
407 
409 {
410  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
411 
413  test1Finished = 1;
414 }
415 
417 {
418  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
419 
421  test1Finished = 1;
422 }
423 
425 {
427  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
428  MQTTAsync c = (MQTTAsync)context;
429  int rc;
430  static int done = 0;
431 
432  if (done == 0)
433  {
434  /* send a message to the proxy to break the connection */
435  pubmsg.payload = "TERMINATE";
436  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
437  pubmsg.qos = 0;
438  pubmsg.retained = 0;
439  rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
440  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
441  done = 1; /* only do this once */
442  }
443 }
444 
445 
446 int test1dReady = 0;
447 char willTopic[100];
448 char test_topic[100];
449 
451 {
452  MQTTAsync c = (MQTTAsync)context;
453  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
454  response->reasonCode);
455  test1dReady = 1;
456 }
457 
458 
460 {
461  MQTTAsync c = (MQTTAsync)context;
463  int rc;
464  int qoss[2] = {2, 2};
465  char* topics[2] = {willTopic, test_topic};
466 
467  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
469  opts.context = c;
470 
471  rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
472  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
473  if (rc != MQTTASYNC_SUCCESS)
474  test1Finished = 1;
475 }
476 
478 
479 void test1cConnected(void* context, char* cause)
480 {
481  MQTTAsync c = (MQTTAsync)context;
482 
483  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
484  test1c_connected = 1;
485 }
486 
487 
488 int test1(struct Options options)
489 {
490  char* testname = "test1";
491  int subsqos = 2;
492  MQTTAsync c, d;
496  int rc = 0;
497  int count = 0;
498  char clientidc[70];
499  char clientidd[70];
500  int i = 0;
504 
505  sprintf(willTopic, "paho-test95-1-%s", unique);
506  sprintf(clientidc, "paho-test95-1-c-%s", unique);
507  sprintf(clientidd, "paho-test95-1-d-%s", unique);
508  sprintf(test_topic, "paho-test95-1-test topic %s", unique);
509 
510  test1Finished = 0;
511  failures = 0;
512  MyLog(LOGA_INFO, "Starting Offline buffering 1 - messages while disconnected");
513  fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
515 
516  createOptions.sendWhileDisconnected = 1;
517  createOptions.MQTTVersion = MQTTVERSION_5;
518  rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc,
519  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
520  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
521  if (rc != MQTTASYNC_SUCCESS)
522  {
523  MQTTAsync_destroy(&c);
524  goto exit;
525  }
526 
527  createOptions.sendWhileDisconnected = 0;
528  createOptions.MQTTVersion = MQTTVERSION_5;
529  rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd,
530  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
531  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
532  if (rc != MQTTASYNC_SUCCESS)
533  {
534  MQTTAsync_destroy(&c);
535  goto exit;
536  }
537 
538  opts.keepAliveInterval = 20;
539  opts.cleanstart = 1;
540 
541  rc = MQTTAsync_setCallbacks(d, d, NULL, test1_messageArrived, NULL);
542  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
543 
544  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
545  opts.context = d;
548  MyLog(LOGA_DEBUG, "Connecting client d");
549  rc = MQTTAsync_connect(d, &opts);
550  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
551  if (rc != MQTTASYNC_SUCCESS)
552  {
553  failures++;
554  goto exit;
555  }
556 
557  /* wait until d is ready: connected and subscribed */
558  count = 0;
559  while (!test1dReady && ++count < 10000)
560  MySleep(100);
561  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
562 
564  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
565 
566  /* let client c go: connect, and send disconnect command to proxy */
567  opts.will = &wopts;
568  opts.will->message = "will message";
569  opts.will->qos = 1;
570  opts.will->retained = 0;
571  opts.will->topicName = willTopic;
572 
573  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
574  property.value.data.data = "test user property";
575  property.value.data.len = (int)strlen(property.value.data.data);
576  property.value.value.data = "test user property value";
577  property.value.value.len = (int)strlen(property.value.value.data);
578  MQTTProperties_add(&willProps, &property);
579  opts.willProperties = &willProps;
580 
583  opts.context = c;
584 
585  opts.cleanstart = 0;
586  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
587  property.value.integer4 = 30;
588  MQTTProperties_add(&props, &property);
589 
590  opts.connectProperties = &props;
591 
592  MyLog(LOGA_DEBUG, "Connecting client c");
593  rc = MQTTAsync_connect(c, &opts);
594  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
595  MQTTProperties_free(&props);
596  MQTTProperties_free(&willProps);
597  if (rc != MQTTASYNC_SUCCESS)
598  {
599  failures++;
600  goto exit;
601  }
602 
603  /* wait for will message */
604  while (!test1_will_message_received && ++count < 10000)
605  MySleep(100);
606 
607  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
608 
609  test1c_connected = 0;
610  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
611  for (i = 0; i < 3; ++i)
612  {
613  char buf[50];
618 
619  sprintf(buf, "QoS %d message", i);
620  pubmsg.payload = buf;
621  pubmsg.payloadlen = (int)strlen(pubmsg.payload) + 1;
622  pubmsg.qos = i;
623  pubmsg.retained = 0;
624  MyLog(LOGA_DEBUG, "Sending qos %d message", pubmsg.qos);
625 
626  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
627  property.value.data.data = "test user property";
628  property.value.data.len = (int)strlen(property.value.data.data);
629  property.value.value.data = "test user property value";
630  property.value.value.len = (int)strlen(property.value.value.data);
631  MQTTProperties_add(&props, &property);
632  pubmsg.properties = props;
633  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
634  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
635  MQTTProperties_free(&props);
636  MyLog(LOGA_DEBUG, "Sent qos %d message, token %d", pubmsg.qos, opts.token);
637  }
638 
640 
641  /* destroy and recreate to read from persistence */
642  MyLog(LOGA_DEBUG, "Destroy and recreate client c");
643  MQTTAsync_destroy(&c);
644 
645  createOptions.sendWhileDisconnected = 1;
646  createOptions.MQTTVersion = MQTTVERSION_5;
647  rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc,
648  MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
649  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
650  if (rc != MQTTASYNC_SUCCESS)
651  {
652  MQTTAsync_destroy(&c);
653  goto exit;
654  }
655 
657 
659  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
660  opts.will = &wopts;
663  opts.context = c;
664  opts.cleanstart = 0;
665  MyLog(LOGA_DEBUG, "Reconnecting client c");
666  test1c_connected = 0;
667  rc = MQTTAsync_connect(c, &opts);
668  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
669  MQTTProperties_free(&props);
670  if (rc != MQTTASYNC_SUCCESS)
671  {
672  failures++;
673  goto exit;
674  }
675 
676  /* wait for client to be reconnected */
677  while (!test1c_connected && ++count < 10000)
678  MySleep(100);
679 
680  /* wait for messages to be received */
681  while (test1_messages_received < 3 && ++count < 10000)
682  MySleep(100);
683 
685 
686  rc = MQTTAsync_disconnect(c, NULL);
687  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
688 
689  rc = MQTTAsync_disconnect(d, NULL);
690  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
691 
692 exit:
693  MQTTAsync_destroy(&c);
694  MQTTAsync_destroy(&d);
695  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
696  (failures == 0) ? "passed" : "failed", testname, tests, failures);
698  return failures;
699 }
700 
701 
702 /*********************************************************************
703 
704  Test2: offline buffering - sending messages while disconnected
705 
706  1. call connect
707  2. use proxy to disconnect the client
708  3. while the client is disconnected, send more messages
709  4. when the client reconnects, check that those messages are sent
710 
711  *********************************************************************/
712 
715 
716 int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
717 {
718  MQTTAsync c = (MQTTAsync)context;
719  static int message_count = 0;
720 
721  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
722 
723  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
725  else
727 
728  MQTTAsync_freeMessage(&message);
729  MQTTAsync_free(topicName);
730 
731  return 1;
732 }
733 
735 
737 
739 {
740  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
741 
743  test2Finished = 1;
744 }
745 
747 {
748  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
749 
751  test2Finished = 1;
752 }
753 
755 {
757  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
758  MQTTAsync c = (MQTTAsync)context;
759  int rc;
760 
761  /* send a message to the proxy to break the connection */
762  pubmsg.payload = "TERMINATE";
763  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
764  pubmsg.qos = 0;
765  pubmsg.retained = 0;
766  rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
767  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
768 }
769 
770 
771 int test2dReady = 0;
772 char willTopic[100];
773 char test_topic[100];
774 
776 {
777  MQTTAsync c = (MQTTAsync)context;
778  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
779  response->reasonCode);
780  test2dReady = 1;
781 }
782 
783 
785 {
786  MQTTAsync c = (MQTTAsync)context;
788  int rc;
789  int qoss[2] = {2, 2};
790  char* topics[2] = {willTopic, test_topic};
791 
792  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
794  opts.context = c;
795 
796  rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
797  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
798  if (rc != MQTTASYNC_SUCCESS)
799  test2Finished = 1;
800 }
801 
803 
804 void test2cConnected(void* context, char* cause)
805 {
806  MQTTAsync c = (MQTTAsync)context;
807 
808  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
809  test2c_connected = 1;
810 }
811 
812 
813 int test2(struct Options options)
814 {
815  char* testname = "test2";
816  int subsqos = 2;
817  MQTTAsync c, d;
821  int rc = 0;
822  int count = 0;
823  char clientidc[70];
824  char clientidd[70];
825  int i = 0;
826  char *URIs[2] = {"rubbish", options.proxy_connection};
829 
830  sprintf(willTopic, "paho-test95-2-%s", unique);
831  sprintf(clientidc, "paho-test95-2-c-%s", unique);
832  sprintf(clientidd, "paho-test95-2-d-%s", unique);
833  sprintf(test_topic, "paho-test95-2-test topic %s", unique);
834 
835  test2Finished = 0;
836  failures = 0;
837  MyLog(LOGA_INFO, "Starting Offline buffering 2 - messages while disconnected with serverURIs");
838  fprintf(xml, "<testcase classname=\"test2\" name=\"%s\"", testname);
840 
841  createOptions.sendWhileDisconnected = 1;
842  createOptions.MQTTVersion = MQTTVERSION_5;
843  rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
844  NULL, &createOptions);
845  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
846  if (rc != MQTTASYNC_SUCCESS)
847  {
848  MQTTAsync_destroy(&c);
849  goto exit;
850  }
851 
852  createOptions.sendWhileDisconnected = 0;
853  createOptions.MQTTVersion = MQTTVERSION_5;
855  NULL, &createOptions);
856  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
857  if (rc != MQTTASYNC_SUCCESS)
858  {
859  MQTTAsync_destroy(&c);
860  goto exit;
861  }
862 
863  opts.keepAliveInterval = 20;
864  opts.cleanstart = 1;
865  opts.MQTTVersion = MQTTVERSION_5;
866 
867  rc = MQTTAsync_setCallbacks(d, d, NULL, test2_messageArrived, NULL);
868  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
869 
870  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
871  opts.context = d;
874  MyLog(LOGA_DEBUG, "Connecting client d");
875  rc = MQTTAsync_connect(d, &opts);
876  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
877  if (rc != MQTTASYNC_SUCCESS)
878  {
879  failures++;
880  goto exit;
881  }
882 
883  /* wait until d is ready: connected and subscribed */
884  count = 0;
885  while (!test2dReady && ++count < 10000)
886  MySleep(100);
887  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
888 
890  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
891 
892  /* let client c go: connect, and send disconnect command to proxy */
893  opts.will = &wopts;
894  opts.will->message = "will message";
895  opts.will->qos = 1;
896  opts.will->retained = 0;
897  opts.will->topicName = willTopic;
900  opts.context = c;
901  opts.MQTTVersion = MQTTVERSION_5;
902  opts.cleanstart = 0;
903  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
904  property.value.integer4 = 30;
905  MQTTProperties_add(&props, &property);
906  opts.connectProperties = &props;
907  opts.serverURIs = URIs;
908  opts.serverURIcount = 2;
909 
910  MyLog(LOGA_DEBUG, "Connecting client c");
911  rc = MQTTAsync_connect(c, &opts);
912  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
913  if (rc != MQTTASYNC_SUCCESS)
914  {
915  failures++;
916  goto exit;
917  }
918  MQTTProperties_free(&props);
919 
920  /* wait for will message */
921  while (!test2_will_message_received && ++count < 10000)
922  MySleep(100);
923 
924  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
925 
926  test2c_connected = 0;
927  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
928  for (i = 0; i < 3; ++i)
929  {
930  char buf[50];
931 
934  sprintf(buf, "QoS %d message", i);
935  pubmsg.payload = buf;
936  pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
937  pubmsg.qos = i;
938  pubmsg.retained = 0;
939  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
940  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
941  }
942 
944 
945  rc = MQTTAsync_reconnect(c);
946  assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
947 
948  /* wait for client to be reconnected */
949  while (!test2c_connected && ++count < 10000)
950  MySleep(100);
951 
952  /* wait for success or failure callback */
953  while (test2_messages_received < 3 && ++count < 10000)
954  MySleep(100);
955 
957 
958  rc = MQTTAsync_disconnect(c, NULL);
959  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
960 
961  rc = MQTTAsync_disconnect(d, NULL);
962  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
963 
964 exit:
965  MySleep(200);
966  MQTTAsync_destroy(&c);
967  MQTTAsync_destroy(&d);
968  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
969  (failures == 0) ? "passed" : "failed", testname, tests, failures);
971  return failures;
972 }
973 
974 /*********************************************************************
975 
976  test3: offline buffering - sending messages while disconnected
977 
978  1. call connect
979  2. use proxy to disconnect the client
980  3. while the client is disconnected, send more messages
981  4. when the client auto reconnects, check that those messages are sent
982 
983  *********************************************************************/
984 
987 
988 int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
989 {
990  MQTTAsync c = (MQTTAsync)context;
991  static int message_count = 0;
992 
993  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
994 
995  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
997  else
999 
1000  MQTTAsync_freeMessage(&message);
1001  MQTTAsync_free(topicName);
1002 
1003  return 1;
1004 }
1005 
1007 
1009 
1011 {
1012  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1013 
1015  test3Finished = 1;
1016 }
1017 
1019 {
1020  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1021 
1023  test3Finished = 1;
1024 }
1025 
1027 {
1029  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
1030  MQTTAsync c = (MQTTAsync)context;
1031  int rc;
1032 
1033  /* send a message to the proxy to break the connection */
1034  pubmsg.payload = "TERMINATE";
1035  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
1036  pubmsg.qos = 0;
1037  pubmsg.retained = 0;
1038  rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
1039  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1040 }
1041 
1042 
1043 int test3dReady = 0;
1044 char willTopic[100];
1045 char test_topic[100];
1046 
1048 {
1049  MQTTAsync c = (MQTTAsync)context;
1050  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
1051  response->reasonCode);
1052  test3dReady = 1;
1053 }
1054 
1055 
1057 {
1058  MQTTAsync c = (MQTTAsync)context;
1060  int rc;
1061  int qoss[2] = {2, 2};
1062  char* topics[2] = {willTopic, test_topic};
1063 
1064  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
1066  opts.context = c;
1067 
1068  rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
1069  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1070  if (rc != MQTTASYNC_SUCCESS)
1071  test3Finished = 1;
1072 }
1073 
1075 
1076 void test3cConnected(void* context, char* cause)
1077 {
1078  MQTTAsync c = (MQTTAsync)context;
1079 
1080  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
1081  test3c_connected = 1;
1082 }
1083 
1084 
1085 int test3(struct Options options)
1086 {
1087  char* testname = "test3";
1088  int subsqos = 2;
1089  MQTTAsync c, d;
1093  int rc = 0;
1094  int count = 0;
1095  char clientidc[70];
1096  char clientidd[70];
1097  int i = 0;
1100 
1101  sprintf(willTopic, "paho-test95-3-%s", unique);
1102  sprintf(clientidc, "paho-test95-3-c-%s", unique);
1103  sprintf(clientidd, "paho-test95-3-d-%s", unique);
1104  sprintf(test_topic, "paho-test95-3-test topic %s", unique);
1105 
1106  test3Finished = 0;
1107  failures = 0;
1108  MyLog(LOGA_INFO, "Starting Offline buffering 3 - messages while disconnected");
1109  fprintf(xml, "<testcase classname=\"test3\" name=\"%s\"", testname);
1111 
1112  createOptions.sendWhileDisconnected = 1;
1113  createOptions.MQTTVersion = MQTTVERSION_5;
1115  NULL, &createOptions);
1116  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1117  if (rc != MQTTASYNC_SUCCESS)
1118  {
1119  MQTTAsync_destroy(&c);
1120  goto exit;
1121  }
1122 
1123  createOptions.sendWhileDisconnected = 0;
1124  createOptions.MQTTVersion = MQTTVERSION_5;
1126  NULL, &createOptions);
1127  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1128  if (rc != MQTTASYNC_SUCCESS)
1129  {
1130  MQTTAsync_destroy(&c);
1131  goto exit;
1132  }
1133 
1134  opts.keepAliveInterval = 20;
1135  opts.cleanstart = 1;
1136  opts.MQTTVersion = MQTTVERSION_5;
1137 
1138  rc = MQTTAsync_setCallbacks(d, d, NULL, test3_messageArrived, NULL);
1139  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1140 
1141  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
1142  opts.context = d;
1143  opts.onSuccess5 = test3dOnConnect;
1144  opts.onFailure5 = test3dOnFailure;
1145  MyLog(LOGA_DEBUG, "Connecting client d");
1146  rc = MQTTAsync_connect(d, &opts);
1147  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1148  if (rc != MQTTASYNC_SUCCESS)
1149  {
1150  failures++;
1151  goto exit;
1152  }
1153 
1154  /* wait until d is ready: connected and subscribed */
1155  count = 0;
1156  while (!test3dReady && ++count < 10000)
1157  MySleep(100);
1158  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
1159 
1161  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1162 
1163  /* let client c go: connect, and send disconnect command to proxy */
1164  opts.will = &wopts;
1165  opts.will->message = "will message";
1166  opts.will->qos = 1;
1167  opts.will->retained = 0;
1168  opts.will->topicName = willTopic;
1169  opts.onSuccess5 = test3cOnConnect;
1170  opts.onFailure5 = test3cOnFailure;
1171  opts.context = c;
1172  opts.cleanstart = 0;
1173  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
1174  property.value.integer4 = 30;
1175  MQTTProperties_add(&props, &property);
1176  opts.connectProperties = &props;
1177  opts.MQTTVersion = MQTTVERSION_5;
1178  opts.automaticReconnect = 1;
1179 
1180  MyLog(LOGA_DEBUG, "Connecting client c");
1181  rc = MQTTAsync_connect(c, &opts);
1182  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1183  MQTTProperties_free(&props);
1184  if (rc != MQTTASYNC_SUCCESS)
1185  {
1186  failures++;
1187  goto exit;
1188  }
1189 
1190  /* wait for will message */
1191  while (!test3_will_message_received && ++count < 10000)
1192  MySleep(100);
1193 
1194  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
1195 
1196  test3c_connected = 0;
1197  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
1198  for (i = 0; i < 3; ++i)
1199  {
1200  char buf[50];
1201 
1204  sprintf(buf, "QoS %d message", i);
1205  pubmsg.payload = buf;
1206  pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
1207  pubmsg.qos = i;
1208  pubmsg.retained = 0;
1209  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
1210  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1211  }
1212 
1214 
1215  /* wait for client to be reconnected */
1216  while (!test3c_connected && ++count < 10000)
1217  MySleep(100);
1218 
1219  /* wait for success or failure callback */
1220  while (test3_messages_received < 3 && ++count < 10000)
1221  MySleep(100);
1222 
1224 
1225  rc = MQTTAsync_disconnect(c, NULL);
1226  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1227 
1228  rc = MQTTAsync_disconnect(d, NULL);
1229  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1230 
1231 exit:
1232  MySleep(200);
1233  MQTTAsync_destroy(&c);
1234  MQTTAsync_destroy(&d);
1235  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
1236  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1238  return failures;
1239 }
1240 
1241 /*********************************************************************
1242 
1243  test4: offline buffering - sending messages while disconnected
1244 
1245  1. call connect
1246  2. use proxy to disconnect the client
1247  3. while the client is disconnected, send more messages
1248  4. when the client auto reconnects, check that those messages are sent
1249 
1250  *********************************************************************/
1251 
1254 
1255 int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
1256 {
1257  MQTTAsync c = (MQTTAsync)context;
1258  static int message_count = 0;
1259 
1260  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen,
1261  message->payload);
1262 
1263  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
1265  else
1267 
1268  MQTTAsync_freeMessage(&message);
1269  MQTTAsync_free(topicName);
1270 
1271  return 1;
1272 }
1273 
1275 
1277 
1279 {
1280  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1281 
1283  test4Finished = 1;
1284 }
1285 
1287 {
1288  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1289 
1291  test4Finished = 1;
1292 }
1293 
1295 {
1297  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
1298  MQTTAsync c = (MQTTAsync)context;
1299  int rc;
1300 
1301  /* send a message to the proxy to break the connection */
1302  pubmsg.payload = "TERMINATE";
1303  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
1304  pubmsg.qos = 0;
1305  pubmsg.retained = 0;
1306  rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
1307  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1308 }
1309 
1310 
1311 int test4dReady = 0;
1312 char willTopic[100];
1313 char test_topic[100];
1314 
1316 {
1317  MQTTAsync c = (MQTTAsync)context;
1318  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
1319  response->reasonCode);
1320  test4dReady = 1;
1321 }
1322 
1323 
1325 {
1326  MQTTAsync c = (MQTTAsync)context;
1328  int rc;
1329  int qoss[2] = {2, 2};
1330  char* topics[2] = {willTopic, test_topic};
1331 
1332  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
1334  opts.context = c;
1335 
1336  rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
1337  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1338  if (rc != MQTTASYNC_SUCCESS)
1339  test4Finished = 1;
1340 }
1341 
1343 
1344 void test4cConnected(void* context, char* cause)
1345 {
1346  MQTTAsync c = (MQTTAsync)context;
1347 
1348  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
1349  test4c_connected = 1;
1350 }
1351 
1352 
1353 int test4(struct Options options)
1354 {
1355  char* testname = "test4";
1356  int subsqos = 2;
1357  MQTTAsync c, d;
1361  int rc = 0;
1362  int count = 0;
1363  char clientidc[70];
1364  char clientidd[70];
1365  int i = 0;
1366  char *URIs[2] = {"rubbish", options.proxy_connection};
1369 
1370  sprintf(willTopic, "paho-test95-4-%s", unique);
1371  sprintf(clientidc, "paho-test95-4-c-%s", unique);
1372  sprintf(clientidd, "paho-test95-4-d-%s", unique);
1373  sprintf(test_topic, "paho-test95-4-test topic %s", unique);
1374 
1375  test4Finished = 0;
1376  failures = 0;
1377  MyLog(LOGA_INFO, "Starting Offline buffering 4 - messages while disconnected with serverURIs");
1378  fprintf(xml, "<testcase classname=\"test4\" name=\"%s\"", testname);
1380 
1381  createOptions.sendWhileDisconnected = 1;
1382  createOptions.MQTTVersion = MQTTVERSION_5;
1383  rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
1384  NULL, &createOptions);
1385  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1386  if (rc != MQTTASYNC_SUCCESS)
1387  {
1388  MQTTAsync_destroy(&c);
1389  goto exit;
1390  }
1391 
1392  createOptions.sendWhileDisconnected = 0;
1393  createOptions.MQTTVersion = MQTTVERSION_5;
1395  NULL, &createOptions);
1396  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1397  if (rc != MQTTASYNC_SUCCESS)
1398  {
1399  MQTTAsync_destroy(&c);
1400  goto exit;
1401  }
1402 
1403  opts.keepAliveInterval = 20;
1404  opts.cleanstart = 1;
1405 
1406  rc = MQTTAsync_setCallbacks(d, d, NULL, test4_messageArrived, NULL);
1407  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1408 
1409  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
1410  opts.context = d;
1411  opts.onSuccess5 = test4dOnConnect;
1412  opts.onFailure5 = test4dOnFailure;
1413  MyLog(LOGA_DEBUG, "Connecting client d");
1414  rc = MQTTAsync_connect(d, &opts);
1415  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1416  if (rc != MQTTASYNC_SUCCESS)
1417  {
1418  failures++;
1419  goto exit;
1420  }
1421 
1422  /* wait until d is ready: connected and subscribed */
1423  count = 0;
1424  while (!test4dReady && ++count < 10000)
1425  MySleep(100);
1426  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
1427 
1429  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1430 
1431  /* let client c go: connect, and send disconnect command to proxy */
1432  opts.will = &wopts;
1433  opts.will->message = "will message";
1434  opts.will->qos = 1;
1435  opts.will->retained = 0;
1436  opts.will->topicName = willTopic;
1437  opts.onSuccess5 = test4cOnConnect;
1438  opts.onFailure5 = test4cOnFailure;
1439  opts.context = c;
1440  opts.cleanstart = 0;
1441  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
1442  property.value.integer4 = 30;
1443  MQTTProperties_add(&props, &property);
1444  opts.connectProperties = &props;
1445  opts.serverURIs = URIs;
1446  opts.serverURIcount = 2;
1447  opts.automaticReconnect = 1;
1448 
1449  MyLog(LOGA_DEBUG, "Connecting client c");
1450  rc = MQTTAsync_connect(c, &opts);
1451  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1452  MQTTProperties_free(&props);
1453  if (rc != MQTTASYNC_SUCCESS)
1454  {
1455  failures++;
1456  goto exit;
1457  }
1458 
1459  /* wait for will message */
1460  while (!test4_will_message_received && ++count < 10000)
1461  MySleep(100);
1462 
1463  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
1464 
1465  test4c_connected = 0;
1466  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
1467  for (i = 0; i < 3; ++i)
1468  {
1469  char buf[50];
1470 
1473  sprintf(buf, "QoS %d message", i);
1474  pubmsg.payload = buf;
1475  pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
1476  pubmsg.qos = i;
1477  pubmsg.retained = 0;
1478  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
1479  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1480  }
1481 
1483 
1484  /* wait for client to be reconnected */
1485  while (!test4c_connected && ++count < 10000)
1486  MySleep(100);
1487 
1488  /* wait for success or failure callback */
1489  while (test4_messages_received < 3 && ++count < 10000)
1490  MySleep(100);
1491 
1493 
1494  rc = MQTTAsync_disconnect(c, NULL);
1495  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1496 
1497  rc = MQTTAsync_disconnect(d, NULL);
1498  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1499 
1500 exit:
1501  MySleep(200);
1502  MQTTAsync_destroy(&c);
1503  MQTTAsync_destroy(&d);
1504  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
1505  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1507  return failures;
1508 }
1509 
1510 
1511 /*********************************************************************
1512 
1513  test5: offline buffering - check max buffered
1514 
1515  1. call connect
1516  2. use proxy to disconnect the client
1517  3. while the client is disconnected, send more messages
1518  4. when the client reconnects, check that those messages are sent
1519 
1520  *********************************************************************/
1521 
1527 
1528 int test5_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
1529 {
1530  MQTTAsync c = (MQTTAsync)context;
1531  static int message_count = 0;
1532 
1533  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
1534 
1535  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
1537  else
1539 
1540  MQTTAsync_freeMessage(&message);
1541  MQTTAsync_free(topicName);
1542 
1543  return 1;
1544 }
1545 
1547 {
1548  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1549 
1551  test5Finished = 1;
1552 }
1553 
1555 {
1556  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
1557 
1559  test5Finished = 1;
1560 }
1561 
1563 {
1565  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
1566  MQTTAsync c = (MQTTAsync)context;
1567  int rc;
1568 
1569  /* send a message to the proxy to break the connection */
1570  pubmsg.payload = "TERMINATE";
1571  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
1572  pubmsg.qos = 0;
1573  pubmsg.retained = 0;
1574  rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
1575  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1576 }
1577 
1578 
1579 int test5dReady = 0;
1580 char willTopic[100];
1581 char test_topic[100];
1582 
1584 {
1585  MQTTAsync c = (MQTTAsync)context;
1586  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
1587  response->reasonCode);
1588  test5dReady = 1;
1589 }
1590 
1591 
1593 {
1594  MQTTAsync c = (MQTTAsync)context;
1596  int rc;
1597  int qoss[2] = {2, 2};
1598  char* topics[2] = {willTopic, test_topic};
1599 
1600  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
1602  opts.context = c;
1603 
1604  rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
1605  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1606  if (rc != MQTTASYNC_SUCCESS)
1607  test5Finished = 1;
1608 }
1609 
1610 void test5cConnected(void* context, char* cause)
1611 {
1612  MQTTAsync c = (MQTTAsync)context;
1613 
1614  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
1615  test5c_connected = 1;
1616 }
1617 
1618 
1619 int test5(struct Options options)
1620 {
1621  char* testname = "test5";
1622  int subsqos = 2;
1623  MQTTAsync c, d;
1627  int rc = 0;
1628  int count = 0;
1629  char clientidc[70];
1630  char clientidd[70];
1631  int i = 0;
1634 
1635  sprintf(willTopic, "paho-test95-5-%s", unique);
1636  sprintf(clientidc, "paho-test95-5-c-%s", unique);
1637  sprintf(clientidd, "paho-test95-5-d-%s", unique);
1638  sprintf(test_topic, "paho-test95-5-test topic %s", unique);
1639 
1640  test5Finished = 0;
1641  failures = 0;
1642  MyLog(LOGA_INFO, "Starting Offline buffering 5 - max buffered");
1643  fprintf(xml, "<testcase classname=\"test5\" name=\"%s\"", testname);
1645 
1646  createOptions.sendWhileDisconnected = 1;
1647  createOptions.maxBufferedMessages = 3;
1648  createOptions.MQTTVersion = MQTTVERSION_5;
1650  NULL, &createOptions);
1651  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1652  if (rc != MQTTASYNC_SUCCESS)
1653  {
1654  MQTTAsync_destroy(&c);
1655  goto exit;
1656  }
1657 
1658  createOptions.sendWhileDisconnected = 0;
1659  createOptions.maxBufferedMessages = 0;
1660  createOptions.MQTTVersion = MQTTVERSION_5;
1662  NULL, &createOptions);
1663  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1664  if (rc != MQTTASYNC_SUCCESS)
1665  {
1666  MQTTAsync_destroy(&c);
1667  goto exit;
1668  }
1669 
1670  opts.keepAliveInterval = 20;
1671  opts.cleanstart = 1;
1672 
1673  rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
1674  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1675 
1676  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
1677  opts.context = d;
1678  opts.onSuccess5 = test5dOnConnect;
1679  opts.onFailure5 = test5dOnFailure;
1680  opts.MQTTVersion = MQTTVERSION_5;
1681  MyLog(LOGA_DEBUG, "Connecting client d");
1682  rc = MQTTAsync_connect(d, &opts);
1683  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1684  if (rc != MQTTASYNC_SUCCESS)
1685  {
1686  failures++;
1687  goto exit;
1688  }
1689 
1690  /* wait until d is ready: connected and subscribed */
1691  count = 0;
1692  while (!test5dReady && ++count < 10000)
1693  MySleep(100);
1694  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
1695 
1697  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1698 
1699  /* let client c go: connect, and send disconnect command to proxy */
1700  opts.will = &wopts;
1701  opts.will->message = "will message";
1702  opts.will->qos = 1;
1703  opts.will->retained = 0;
1704  opts.will->topicName = willTopic;
1705  opts.onSuccess5 = test5cOnConnect;
1706  opts.onFailure5 = test5cOnFailure;
1707  opts.context = c;
1708  opts.cleanstart = 0;
1709  opts.MQTTVersion = MQTTVERSION_5;
1710  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
1711  property.value.integer4 = 30;
1712  MQTTProperties_add(&props, &property);
1713  opts.connectProperties = &props;
1714 
1715  MyLog(LOGA_DEBUG, "Connecting client c");
1716  rc = MQTTAsync_connect(c, &opts);
1717  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1718  MQTTProperties_free(&props);
1719  if (rc != MQTTASYNC_SUCCESS)
1720  {
1721  failures++;
1722  goto exit;
1723  }
1724 
1725  /* wait for will message */
1726  while (!test5_will_message_received && ++count < 10000)
1727  MySleep(100);
1728 
1729  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
1730 
1731  test5c_connected = 0;
1732  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
1733  for (i = 0; i < 5; ++i)
1734  {
1735  char buf[50];
1736 
1739  sprintf(buf, "QoS %d message", i);
1740  pubmsg.payload = buf;
1741  pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
1742  pubmsg.qos = i % 3;
1743  pubmsg.retained = 0;
1744  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
1745  if (i <= 2)
1746  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1747  else
1748  assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
1749  }
1750 
1752 
1753  rc = MQTTAsync_reconnect(c);
1754  assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1755 
1756  /* wait for client to be reconnected */
1757  while (!test5c_connected && ++count < 10000)
1758  MySleep(100);
1759 
1760  /* wait for success or failure callback */
1761  while (test5_messages_received < 3 && ++count < 10000)
1762  MySleep(100);
1763 
1765 
1766  rc = MQTTAsync_disconnect(c, NULL);
1767  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1768 
1769  rc = MQTTAsync_disconnect(d, NULL);
1770  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1771 
1772 exit:
1773  MySleep(200);
1774  MQTTAsync_destroy(&c);
1775  MQTTAsync_destroy(&d);
1776  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
1777  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1779  return failures;
1780 }
1781 
1782 
1783 int test6(struct Options options)
1784 {
1785  char* testname = "test6";
1786  int subsqos = 2;
1787  MQTTAsync c, d;
1791  int rc = 0;
1792  int count = 0;
1793  char clientidc[70];
1794  char clientidd[70];
1795  int i = 0;
1798 
1801  test5Finished = 0;
1803  test5c_connected = 0;
1804 
1805  sprintf(willTopic, "paho-test95-6-%s", unique);
1806  sprintf(clientidc, "paho-test95-6-c-%s", unique);
1807  sprintf(clientidd, "paho-test95-6-d-%s", unique);
1808  sprintf(test_topic, "paho-test95-6-test topic %s", unique);
1809 
1810  test5Finished = 0;
1811  failures = 0;
1812  MyLog(LOGA_INFO, "Starting Offline buffering 6 - max buffered with binary will");
1813  fprintf(xml, "<testcase classname=\"test6\" name=\"%s\"", testname);
1815 
1816  createOptions.sendWhileDisconnected = 1;
1817  createOptions.maxBufferedMessages = 3;
1818  createOptions.MQTTVersion = MQTTVERSION_5;
1820  NULL, &createOptions);
1821  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1822  if (rc != MQTTASYNC_SUCCESS)
1823  {
1824  MQTTAsync_destroy(&c);
1825  goto exit;
1826  }
1827 
1828  createOptions.sendWhileDisconnected = 0;
1829  createOptions.maxBufferedMessages = 0;
1830  createOptions.MQTTVersion = MQTTVERSION_5;
1832  NULL, &createOptions);
1833  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
1834  if (rc != MQTTASYNC_SUCCESS)
1835  {
1836  MQTTAsync_destroy(&c);
1837  goto exit;
1838  }
1839 
1840  opts.keepAliveInterval = 20;
1841  opts.cleanstart = 1;
1842  opts.MQTTVersion = MQTTVERSION_5;
1843 
1844  rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
1845  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1846 
1847  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
1848  opts.context = d;
1849  opts.onSuccess5 = test5dOnConnect;
1850  opts.onFailure5 = test5dOnFailure;
1851  MyLog(LOGA_DEBUG, "Connecting client d");
1852  rc = MQTTAsync_connect(d, &opts);
1853  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1854  if (rc != MQTTASYNC_SUCCESS)
1855  {
1856  failures++;
1857  goto exit;
1858  }
1859 
1860  /* wait until d is ready: connected and subscribed */
1861  count = 0;
1862  while (!test5dReady && ++count < 10000)
1863  MySleep(100);
1864  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
1865 
1867  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1868 
1869  /* let client c go: connect, and send disconnect command to proxy */
1870  opts.will = &wopts;
1871  opts.will->payload.data = "will message";
1872  opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
1873  opts.will->qos = 1;
1874  opts.will->retained = 0;
1875  opts.will->topicName = willTopic;
1876  opts.onSuccess5 = test5cOnConnect;
1877  opts.onFailure5 = test5cOnFailure;
1878  opts.context = c;
1879  opts.cleanstart = 0;
1880  opts.MQTTVersion = MQTTVERSION_5;
1881  property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
1882  property.value.integer4 = 30;
1883  MQTTProperties_add(&props, &property);
1884  opts.connectProperties = &props;
1885 
1886  MyLog(LOGA_DEBUG, "Connecting client c");
1887  rc = MQTTAsync_connect(c, &opts);
1888  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1889  MQTTProperties_free(&props);
1890  if (rc != MQTTASYNC_SUCCESS)
1891  {
1892  failures++;
1893  goto exit;
1894  }
1895 
1896  /* wait for will message */
1897  while (!test5_will_message_received && ++count < 10000)
1898  MySleep(100);
1899 
1900  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
1901 
1902  test5c_connected = 0;
1903  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
1904  for (i = 0; i < 5; ++i)
1905  {
1906  char buf[50];
1907 
1910  sprintf(buf, "QoS %d message", i);
1911  pubmsg.payload = buf;
1912  pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
1913  pubmsg.qos = i % 3;
1914  pubmsg.retained = 0;
1915  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
1916  if (i <= 2)
1917  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1918  else
1919  assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
1920  }
1921 
1923 
1924  rc = MQTTAsync_reconnect(c);
1925  assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1926 
1927  /* wait for client to be reconnected */
1928  while (!test5c_connected && ++count < 10000)
1929  MySleep(100);
1930 
1931  /* wait for success or failure callback */
1932  while (test5_messages_received < 3 && ++count < 10000)
1933  MySleep(100);
1934 
1936 
1937  rc = MQTTAsync_disconnect(c, NULL);
1938  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1939 
1940  rc = MQTTAsync_disconnect(d, NULL);
1941  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
1942 
1943 exit:
1944  MySleep(200);
1945  MQTTAsync_destroy(&c);
1946  MQTTAsync_destroy(&d);
1947  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
1948  (failures == 0) ? "passed" : "failed", testname, tests, failures);
1950  return failures;
1951 }
1952 
1953 
1954 /*********************************************************************
1955 
1956 Test7: Fill up TCP buffer with QoS 0 messages
1957 
1958 *********************************************************************/
1964 int test7dReady = 0;
1965 
1966 int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
1967 {
1968  MQTTAsync c = (MQTTAsync)context;
1969  static int message_count = 0;
1970 
1971  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
1972 
1973  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
1975  else
1977 
1978  MQTTAsync_freeMessage(&message);
1979  MQTTAsync_free(topicName);
1980 
1981  return 1;
1982 }
1983 
1984 void test7cConnected(void* context, char* cause)
1985 {
1986  MQTTAsync c = (MQTTAsync)context;
1987 
1988  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
1989  test7c_connected = 1;
1990 }
1991 
1993 {
1994  MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
1995 
1997  test7Finished = 1;
1998 }
1999 
2001 {
2002  MQTTAsync c = (MQTTAsync)context;
2004 
2005  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
2006 
2007  /* send a message to the proxy to break the connection */
2008  pubmsg.payload = "TERMINATE";
2009  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
2010  pubmsg.qos = 0;
2011  pubmsg.retained = 0;
2012  //rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
2013  //assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
2014 }
2015 
2017 {
2018  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
2019 
2021  test7Finished = 1;
2022 }
2023 
2024 
2026 {
2027  MQTTAsync c = (MQTTAsync)context;
2028  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
2029  response->reasonCode);
2030  test7dReady = 1;
2031 }
2032 
2033 
2035 {
2036  MQTTAsync c = (MQTTAsync)context;
2038  int qoss[2] = {2, 2};
2039  char* topics[2] = {willTopic, test_topic};
2040 
2041  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
2043  opts.context = c;
2044 
2045  //rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
2046  //assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
2047  //if (rc != MQTTASYNC_SUCCESS)
2048  // test5Finished = 1;
2049  test7dReady = 1;
2050 }
2051 
2052 
2053 int test7(struct Options options)
2054 {
2055  char* testname = "test7";
2056  int subsqos = 2;
2057  MQTTAsync c, d;
2061  int rc = 0;
2062  int count = 0;
2063  char clientidc[70];
2064  char clientidd[70];
2065  int i = 0;
2066 
2069  test7Finished = 0;
2071  test7c_connected = 0;
2072 
2073  sprintf(willTopic, "paho-test95-7-%s", unique);
2074  sprintf(clientidc, "paho-test9-7-c-%s", unique);
2075  sprintf(clientidd, "paho-test9-7-d-%s", unique);
2076  sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
2077 
2078  test7Finished = 0;
2079  failures = 0;
2080  MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
2081  fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
2083 
2084  createOpts.MQTTVersion = MQTTVERSION_5;
2086  NULL, &createOpts);
2087  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
2088  if (rc != MQTTASYNC_SUCCESS)
2089  {
2090  MQTTAsync_destroy(&c);
2091  goto exit;
2092  }
2093 
2094  createOpts.MQTTVersion = MQTTVERSION_5;
2096  NULL, &createOpts);
2097  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
2098  if (rc != MQTTASYNC_SUCCESS)
2099  {
2100  MQTTAsync_destroy(&c);
2101  goto exit;
2102  }
2103 
2104  opts.keepAliveInterval = 20;
2105  opts.cleansession = 1;
2106 
2107  rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
2108  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
2109 
2110  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
2111  opts.context = d;
2114  MyLog(LOGA_DEBUG, "Connecting client d");
2115  rc = MQTTAsync_connect(d, &opts);
2116  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
2117  if (rc != MQTTASYNC_SUCCESS)
2118  {
2119  failures++;
2120  goto exit;
2121  }
2122 
2123  /* wait until d is ready: connected and subscribed */
2124  count = 0;
2125  while (!test7dReady && ++count < 10000)
2126  {
2127  if (test7Finished)
2128  goto exit;
2129  MySleep(100);
2130  }
2131  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
2132 
2134  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
2135 
2136  /* let client c go: connect, and send disconnect command to proxy */
2137  opts.will = &wopts;
2138  opts.will->payload.data = "will message";
2139  opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
2140  opts.will->qos = 1;
2141  opts.will->retained = 0;
2142  opts.will->topicName = willTopic;
2145  opts.context = c;
2146  opts.cleansession = 0;
2147  /*opts.automaticReconnect = 1;
2148  opts.minRetryInterval = 3;
2149  opts.maxRetryInterval = 6;*/
2150 
2151  MyLog(LOGA_DEBUG, "Connecting client c");
2152  rc = MQTTAsync_connect(c, &opts);
2153  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
2154  if (rc != MQTTASYNC_SUCCESS)
2155  {
2156  failures++;
2157  goto exit;
2158  }
2159 
2160  count = 0;
2161  while (!test7c_connected && ++count < 10000)
2162  MySleep(100);
2163  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
2164 
2165  /* wait for will message */
2166  //while (test7_will_message_received == 0 && ++count < 10000)
2167  // MySleep(100);
2168 
2169  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
2170 
2171  test7c_connected = 0;
2172  char buf[5000000];
2173  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
2174  for (i = 0; i < 50000; ++i)
2175  {
2178  pubmsg.qos = 0; /*i % 3;*/
2179  sprintf(buf, "QoS %d message", pubmsg.qos);
2180  pubmsg.payload = buf;
2181  pubmsg.payloadlen = 5000000; //(int)(strlen(pubmsg.payload) + 1);
2182  pubmsg.retained = 0;
2183  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
2184  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
2185  if (rc != 0)
2186  {
2187  //MyLog(LOGA_DEBUG, "Connecting client c");
2188  //rc = MQTTAsync_connect(c, &opts);
2189  //MySleep(1000);
2190  break;
2191  }
2192  }
2193 
2194 #if 0
2196 
2197  rc = MQTTAsync_reconnect(c);
2198  assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
2199 
2200  /* wait for client to be reconnected */
2201  while (!test5c_connected && ++count < 10000)
2202  MySleep(100);
2203 
2204  /* wait for success or failure callback */
2205  while (test5_messages_received < 3 && ++count < 10000)
2206  MySleep(100);
2207 
2209 #endif
2210 
2211 exit:
2212  rc = MQTTAsync_disconnect(c, NULL);
2213  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
2214 
2215  rc = MQTTAsync_disconnect(d, NULL);
2216  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
2217 
2218  MySleep(200);
2219  MQTTAsync_destroy(&c);
2220  MQTTAsync_destroy(&d);
2221  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
2222  (failures == 0) ? "passed" : "failed", testname, tests, failures);
2224  return failures;
2225 }
2226 
2227 
2228 
2229 void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
2230 {
2231  printf("%s\n", message);
2232 }
2233 
2234 
2235 int main(int argc, char** argv)
2236 {
2237  int* numtests = &tests;
2238  int rc = 0;
2239  int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6 };
2240  time_t randtime;
2241 
2242  srand((unsigned) time(&randtime));
2243  sprintf(unique, "%u", rand());
2244  MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
2245 
2246  xml = fopen("TEST-test9.xml", "w");
2247  fprintf(xml, "<testsuite name=\"test9\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
2248 
2250  getopts(argc, argv);
2251 
2252  if (options.test_no == 0)
2253  { /* run all the tests */
2255  {
2256  failures = 0;
2258  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
2259  }
2260  }
2261  else
2262  {
2264  rc = tests[options.test_no](options); /* run just the selected test */
2265  }
2266 
2267  MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
2268  if (rc == 0)
2269  MyLog(LOGA_INFO, "verdict pass");
2270  else
2271  MyLog(LOGA_INFO, "verdict fail");
2272 
2273  fprintf(xml, "</testsuite>\n");
2274  fclose(xml);
2275 
2276  return rc;
2277 }
#define assert(a, b, c, d)
Definition: test95.c:203
MQTTProperties properties
Definition: MQTTAsync.h:316
int test7Finished
Definition: test95.c:1962
int test6(struct Options options)
Definition: test95.c:1783
int test7_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:1966
void test1dOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:459
int test1dReady
Definition: test95.c:446
enum MQTTPropertyCodes value
void logProperties(MQTTProperties *props)
Definition: test95.c:254
void test7cOnConnectSuccess(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:2000
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
int test3_will_message_received
Definition: test95.c:985
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
void test7cOnConnectFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1992
void test5donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1583
void test2cConnected(void *context, char *cause)
Definition: test95.c:804
MQTTProperties props
Definition: paho_c_pub.c:54
int failures
Definition: test95.c:209
char * cur_output
Definition: test95.c:213
int test2_will_message_received
Definition: test95.c:713
void test2cOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:738
char * proxy_connection
Definition: test1.c:51
int test1OnFailureCalled
Definition: test95.c:406
void test7cConnected(void *context, char *cause)
Definition: test95.c:1984
void test3donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1047
const char * message
Definition: MQTTAsync.h:996
MQTTLenString value
char * connection
const char * topicName
Definition: MQTTAsync.h:994
int test3dReady
Definition: test95.c:1043
int main(int argc, char **argv)
Definition: test95.c:2235
void MQTTProperties_free(MQTTProperties *props)
int test1c_connected
Definition: test95.c:477
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
MQTTClient d
Definition: test10.c:1656
char unique[50]
Definition: test95.c:42
void test4dOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1286
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
void MySleep(long milliseconds)
Definition: test95.c:142
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
void test2donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:775
void test7dOnConnectFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:2016
struct pubsub_opts opts
Definition: paho_c_pub.c:42
int test7_will_message_received
Definition: test95.c:1960
int MQTTProperties_add(MQTTProperties *props, const MQTTProperty *prop)
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
int MQTTProperty_getType(enum MQTTPropertyCodes value)
char output[3000]
Definition: test95.c:212
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:583
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
int test4_will_message_received
Definition: test95.c:1252
int MQTTAsync_setConnected(MQTTAsync handle, void *context, MQTTAsync_connected *connected)
Definition: MQTTAsync.c:3178
int test2_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:716
void test4cOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1294
int test7c_connected
Definition: test95.c:1959
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:720
int test4_messages_received
Definition: test95.c:1253
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
void test5cOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1562
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
Definition: test95.c:229
int test4(struct Options options)
Definition: test95.c:1353
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
void test2dOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:784
int test2Finished
Definition: test95.c:734
void test7donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:2025
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
static char msg_buf[512]
Definition: Log.c:122
void test5dOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1554
int test1_messages_received
Definition: test95.c:370
struct Options options
int test4OnFailureCalled
Definition: test95.c:1276
int test1Finished
Definition: test95.c:404
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
#define LOGA_INFO
Definition: test95.c:101
int test3c_connected
Definition: test95.c:1074
void test3dOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1018
int test5dReady
Definition: test95.c:1579
void test1cConnected(void *context, char *cause)
Definition: test95.c:479
#define MQTTAsync_connectOptions_initializer5
Definition: MQTTAsync.h:1338
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
Definition: MQTTAsync.c:4737
int test4c_connected
Definition: test95.c:1342
const char * MQTTPropertyName(enum MQTTPropertyCodes value)
description
Definition: setup.py:19
void test2dOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:746
int message_count
Definition: test5.c:72
int test3OnFailureCalled
Definition: test95.c:1008
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1327
#define ARRAY_SIZE(a)
Definition: test95.c:44
int test3(struct Options options)
Definition: test95.c:1085
enum MQTTPropertyCodes identifier
struct MQTTAsync_willOptions::@54 payload
#define START_TIME_TYPE
Definition: test95.c:167
void usage(void)
Definition: test95.c:46
int test5Finished
Definition: test95.c:1524
#define MQTTAsync_createOptions_initializer
Definition: MQTTAsync.h:965
void test5cOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1546
char willTopic[100]
Definition: test95.c:447
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char *const *topic, int *qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4004
MQTTAsync_token token
Definition: MQTTAsync.h:714
void test4donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1315
long elapsed(START_TIME_TYPE start_time)
Definition: test95.c:193
int test5c_connected
Definition: test95.c:1526
#define MQTTASYNC_MAX_BUFFERED_MESSAGES
Definition: MQTTAsync.h:167
int MQTTAsync_token
Definition: MQTTAsync.h:249
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: test95.c:2229
int test4Finished
Definition: test95.c:1274
int test2_messages_received
Definition: test95.c:714
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
void test4cConnected(void *context, char *cause)
Definition: test95.c:1344
#define MQTTCLIENT_PERSISTENCE_DEFAULT
void test7dOnConnectSuccess(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:2034
void test4dOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1324
const char * name
int test4_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:1255
MQTTProperty * array
int test7(struct Options options)
Definition: test95.c:2053
void test1donSubscribe(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:450
START_TIME_TYPE global_start_time
Definition: test95.c:211
void test3cConnected(void *context, char *cause)
Definition: test95.c:1076
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
int test5(struct Options options)
Definition: test95.c:1619
int tests
Definition: test95.c:208
int test4dReady
Definition: test95.c:1311
void test1dOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:416
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
#define LOGA_DEBUG
Definition: test95.c:100
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
START_TIME_TYPE start_clock(void)
Definition: test95.c:169
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
MQTTClient c
Definition: test10.c:1656
float time
Definition: mqtt_test.py:17
dictionary context
Definition: test2.py:57
void test1cOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:424
int test3Finished
Definition: test95.c:1006
int test3_messages_received
Definition: test95.c:986
int test3_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:988
null localtime_s(...)
Definition: chrono.h:286
void assert3PendingTokens(MQTTAsync c)
Definition: test95.c:318
void MyLog(int LOGA_level, char *format,...)
Definition: test95.c:105
void test3cOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1026
int MQTTAsync_reconnect(MQTTAsync handle)
Definition: MQTTAsync.c:1545
MQTTProperties * willProperties
Definition: MQTTAsync.h:1315
void waitForNoPendingTokens(MQTTAsync c)
Definition: test95.c:293
void test3dOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1056
int test7_messages_received
Definition: test95.c:1961
int test2c_connected
Definition: test95.c:802
void test4cOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1278
int test1_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:372
const void * data
Definition: MQTTAsync.h:1010
void write_test_result(void)
Definition: test95.c:216
void test1cOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:408
int test2OnFailureCalled
Definition: test95.c:736
int test1(struct Options options)
Definition: test95.c:488
int test7dReady
Definition: test95.c:1964
char *const * serverURIs
Definition: MQTTAsync.h:1277
char test_topic[100]
Definition: test95.c:448
int test5_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test95.c:1528
int test5OnFailureCalled
Definition: test95.c:1525
int test5_messages_received
Definition: test95.c:1523
void getopts(int argc, char **argv)
Definition: test95.c:66
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
int test1_will_message_received
Definition: test95.c:369
char * topics[]
int test5_will_message_received
Definition: test95.c:1522
MQTTProperty property
Definition: paho_c_pub.c:53
enum MQTTReasonCodes rc
Definition: test10.c:1112
void test3cOnFailure(void *context, MQTTAsync_failureData5 *response)
Definition: test95.c:1010
void test5dOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:1592
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1321
void test5cConnected(void *context, char *cause)
Definition: test95.c:1610
int test2(struct Options options)
Definition: test95.c:813
int test7OnFailureCalled
Definition: test95.c:1963
int test_no
Definition: test1.c:54
FILE * xml
Definition: test95.c:210
MQTTProperties * connectProperties
Definition: MQTTAsync.h:1311
int test2dReady
Definition: test95.c:771
void test2cOnConnect(void *context, MQTTAsync_successData5 *response)
Definition: test95.c:754
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48