test8.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  *******************************************************************************/
16 
23 #include "MQTTAsync.h"
24 #include <string.h>
25 #include <stdlib.h>
26 
27 #if !defined(_WINDOWS)
28  #include <sys/time.h>
29  #include <sys/socket.h>
30  #include <unistd.h>
31  #include <errno.h>
32 #else
33  #include <windows.h>
34 #endif
35 
36 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
37 
38 void usage(void)
39 {
40  printf("help!!\n");
41  exit(EXIT_FAILURE);
42 }
43 
44 struct Options
45 {
46  char* connection;
47  int verbose;
48  int test_no;
49  int size;
50 } options =
51 {
52  "tcp://localhost:1883",
53  0,
54  -1,
55  5000000,
56 };
57 
58 void getopts(int argc, char** argv)
59 {
60  int count = 1;
61 
62  while (count < argc)
63  {
64  if (strcmp(argv[count], "--test_no") == 0)
65  {
66  if (++count < argc)
67  options.test_no = atoi(argv[count]);
68  else
69  usage();
70  }
71  else if (strcmp(argv[count], "--size") == 0)
72  {
73  if (++count < argc)
74  options.size = atoi(argv[count]);
75  else
76  usage();
77  }
78  else if (strcmp(argv[count], "--connection") == 0)
79  {
80  if (++count < argc)
81  options.connection = argv[count];
82  else
83  usage();
84  }
85  else if (strcmp(argv[count], "--verbose") == 0)
86  options.verbose = 1;
87  count++;
88  }
89 }
90 
91 
92 #define LOGA_DEBUG 0
93 #define LOGA_INFO 1
94 #include <stdarg.h>
95 #include <time.h>
96 #include <sys/timeb.h>
97 void MyLog(int LOGA_level, char* format, ...)
98 {
99  static char msg_buf[256];
100  va_list args;
101 #if defined(_WIN32) || defined(_WINDOWS)
102  struct timeb ts;
103 #else
104  struct timeval ts;
105 #endif
106  struct tm timeinfo;
107 
108  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
109  return;
110 
111 #if defined(_WIN32) || defined(_WINDOWS)
112  ftime(&ts);
113  localtime_s(&timeinfo, &ts.time);
114 #else
115  gettimeofday(&ts, NULL);
116  localtime_r(&ts.tv_sec, &timeinfo);
117 #endif
118  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
119 
120 #if defined(_WIN32) || defined(_WINDOWS)
121  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
122 #else
123  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
124 #endif
125 
126  va_start(args, format);
127  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
128  va_end(args);
129 
130  printf("%s\n", msg_buf);
131  fflush(stdout);
132 }
133 
134 
135 #if defined(_WIN32) || defined(_WINDOWS)
136 #define mqsleep(A) Sleep(1000*A)
137 #define START_TIME_TYPE DWORD
138 static DWORD start_time = 0;
140 {
141  return GetTickCount();
142 }
143 #elif defined(AIX)
144 #define mqsleep sleep
145 #define START_TIME_TYPE struct timespec
147 {
148  static struct timespec start;
149  clock_gettime(CLOCK_REALTIME, &start);
150  return start;
151 }
152 #else
153 #define mqsleep sleep
154 #define START_TIME_TYPE struct timeval
155 /* TODO - unused - remove? static struct timeval start_time; */
157 {
158  struct timeval start_time;
159  gettimeofday(&start_time, NULL);
160  return start_time;
161 }
162 #endif
163 
164 
165 #if defined(_WIN32)
166 long elapsed(START_TIME_TYPE start_time)
167 {
168  return GetTickCount() - start_time;
169 }
170 #elif defined(AIX)
171 #define assert(a)
172 long elapsed(struct timespec start)
173 {
174  struct timespec now, res;
175 
176  clock_gettime(CLOCK_REALTIME, &now);
177  ntimersub(now, start, res);
178  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
179 }
180 #else
181 long elapsed(START_TIME_TYPE start_time)
182 {
183  struct timeval now, res;
184 
185  gettimeofday(&now, NULL);
186  timersub(&now, &start_time, &res);
187  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
188 }
189 #endif
190 
191 
193 
194 
195 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
196 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
197 
198 
199 int tests = 0;
200 int failures = 0;
201 
202 
203 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
204 {
205  ++tests;
206  if (!value)
207  {
208  va_list args;
209 
210  ++failures;
211  printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
212 
213  va_start(args, format);
214  vprintf(format, args);
215  va_end(args);
216  }
217  else
218  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
219 }
220 
221 volatile int test_finished = 0;
222 
223 char* test_topic = "async test topic";
224 
225 
227 {
228  MQTTAsync c = (MQTTAsync)context;
229  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
230  test_finished = 1;
231 }
232 
233 
235 {
236  MQTTAsync c = (MQTTAsync)context;
238  int rc;
239 
240  MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
242  opts.context = c;
243 
244  rc = MQTTAsync_disconnect(c, &opts);
245  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
246 }
247 
248 
249 int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
250 {
251  MQTTAsync c = (MQTTAsync)context;
252  static int message_count = 0;
253  int rc;
254 
255  MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
256 
257  if (++message_count == 1)
258  {
261  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
262  pubmsg.payloadlen = 11;
263  pubmsg.qos = 2;
264  pubmsg.retained = 0;
265  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
266  }
267  else
268  {
270 
272  opts.context = c;
273  rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
274  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
275  }
276 
277  MQTTAsync_freeMessage(&message);
278  MQTTAsync_free(topicName);
279 
280  return 1;
281 }
282 
284 {
285  MQTTAsync c = (MQTTAsync)context;
287  int rc;
288 
289  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
290 
291  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
292  pubmsg.payloadlen = 11;
293  pubmsg.qos = 2;
294  pubmsg.retained = 0;
295 
296  rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
297 }
298 
299 
301 {
302  MQTTAsync c = (MQTTAsync)context;
304  int rc;
305 
306  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
308  opts.context = c;
309 
310  rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
311  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
312  if (rc != MQTTASYNC_SUCCESS)
313  test_finished = 1;
314 }
315 
316 
318 {
319  MQTTAsync c = (MQTTAsync)context;
321 
322  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
323 
324  test_finished = 1;
325 }
326 
327 
328 /*********************************************************************
329 
330 Test1: Basic connect, subscribe send and receive.
331 
332 *********************************************************************/
333 int test1(struct Options options)
334 {
335  int subsqos = 2;
336  MQTTAsync c;
339  int rc = 0;
340  char* test_topic = "C client test1";
341  char* serverURIs[2] = {"tcp://localhost:1882", options.connection};
342 
343  failures = 0;
344  MyLog(LOGA_INFO, "Starting test 1 - asynchronous connect");
345 
346  rc = MQTTAsync_create(&c, options.connection, "async_8_test1",
348  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
349  if (rc != MQTTASYNC_SUCCESS)
350  {
351  MQTTAsync_destroy(&c);
352  goto exit;
353  }
354 
355  rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
356  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
357 
358  opts.keepAliveInterval = 20;
359  opts.cleansession = 1;
360  opts.username = "testuser";
361  opts.password = "testpassword";
362 
363  opts.will = &wopts;
364  opts.will->message = "will message";
365  opts.will->qos = 1;
366  opts.will->retained = 0;
367  opts.will->topicName = "will topic";
368  opts.will = NULL;
369  opts.onSuccess = test1_onConnect;
371  opts.context = c;
372  opts.serverURIcount = 2;
373  opts.serverURIs = serverURIs;
374 
375  MyLog(LOGA_DEBUG, "Connecting");
376  rc = MQTTAsync_connect(c, &opts);
377  rc = 0;
378  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
379  if (rc != MQTTASYNC_SUCCESS)
380  goto exit;
381 
382  while (!test_finished)
383  #if defined(_WIN32)
384  Sleep(100);
385  #else
386  usleep(10000L);
387  #endif
388 
389  MQTTAsync_destroy(&c);
390 
391 exit:
392  MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
393  (failures == 0) ? "passed" : "failed", tests, failures);
394 
395  return failures;
396 }
397 
399 
401 {
402  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
403 
405  test_finished = 1;
406 }
407 
408 
410 {
411 
412  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
413 
414  assert("Connect should not succeed", 0, "connect success callback was called", 0);
415 
416  test_finished = 1;
417 }
418 
419 /*********************************************************************
420 
421 Test2: connect timeout
422 
423 *********************************************************************/
424 int test2(struct Options options)
425 {
426  int subsqos = 2;
427  MQTTAsync c;
430  int rc = 0;
431  char* test_topic = "C client test2";
432 
433  test_finished = 0;
434 
435  MyLog(LOGA_INFO, "Starting test 2 - connect timeout");
436 
437  rc = MQTTAsync_create(&c, "tcp://9.20.96.160:66", "connect timeout",
439  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
440  if (rc != MQTTASYNC_SUCCESS)
441  {
442  MQTTAsync_destroy(&c);
443  goto exit;
444  }
445 
446  rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
447  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
448 
449  opts.connectTimeout = 5;
450  opts.keepAliveInterval = 20;
451  opts.cleansession = 1;
452  opts.username = "testuser";
453  opts.password = "testpassword";
454 
455  opts.will = &wopts;
456  opts.will->message = "will message";
457  opts.will->qos = 1;
458  opts.will->retained = 0;
459  opts.will->topicName = "will topic";
460  opts.will = NULL;
461  opts.onSuccess = test2_onConnect;
462  opts.onFailure = test2_onFailure;
463  opts.context = c;
464 
465  MyLog(LOGA_DEBUG, "Connecting");
466  rc = MQTTAsync_connect(c, &opts);
467  rc = 0;
468  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
469  if (rc != MQTTASYNC_SUCCESS)
470  goto exit;
471 
472  while (!test_finished)
473  #if defined(_WIN32)
474  Sleep(100);
475  #else
476  usleep(10000L);
477  #endif
478 
479  MQTTAsync_destroy(&c);
480 
481 exit:
482  assert("Connect onFailure should be called once", test2_onFailure_called == 1,
483  "connect onFailure was called %d times", test2_onFailure_called);
484 
485  MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
486  (failures == 0) ? "passed" : "failed", tests, failures);
487 
488  return failures;
489 }
490 
491 
492 typedef struct
493 {
494  MQTTAsync c;
495  int index;
496  char clientid[24];
497  char test_topic[100];
498  int message_count;
499 } client_data;
500 
501 
503 {
504  client_data* cd = (client_data*)context;
505  MyLog(LOGA_DEBUG, "In onDisconnect callback for client \"%s\"", cd->clientid);
506  test_finished++;
507 }
508 
509 
511 {
512  client_data* cd = (client_data*)context;
513  MyLog(LOGA_DEBUG, "In QoS 0 onPublish callback for client \"%s\"", cd->clientid);
514 }
515 
516 
518 {
519  client_data* cd = (client_data*)context;
521  int rc;
522 
523  MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback \"%s\"", cd->clientid);
525  opts.context = cd;
526 
527  rc = MQTTAsync_disconnect(cd->c, &opts);
528  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
529 }
530 
531 
532 int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
533 {
534  client_data* cd = (client_data*)context;
535  int rc;
536 
537  MyLog(LOGA_DEBUG, "In messageArrived callback \"%s\" message count ", cd->clientid);
538 
539  if (++cd->message_count == 1)
540  {
543  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
544  pubmsg.payloadlen = 25;
545  pubmsg.qos = 1;
546  pubmsg.retained = 0;
547  rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
548  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
549  }
550  else if (cd->message_count == 2)
551  {
554  pubmsg.payload = "a QoS 0 message that we can shorten to the extent that we need to payload up to 11";
555  pubmsg.payloadlen = 29;
556  pubmsg.qos = 0;
557  pubmsg.retained = 0;
558  opts.context = cd;
559  opts.onSuccess = test3_onPublish;
560 
561  rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
562  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
563  }
564  else
565  {
567 
569  opts.context = cd;
570  rc = MQTTAsync_unsubscribe(cd->c, cd->test_topic, &opts);
571  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
572  }
573  MQTTAsync_freeMessage(&message);
574  MQTTAsync_free(topicName);
575  return 1;
576 }
577 
579 {
580  client_data* cd = (client_data*)context;
582  int rc;
583 
584  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback \"%s\"", cd->clientid);
585 
586  pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
587  pubmsg.payloadlen = 11;
588  pubmsg.qos = 2;
589  pubmsg.retained = 0;
590 
591  rc = MQTTAsync_send(cd->c, cd->test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
592  assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
593 }
594 
595 
597 {
598  client_data* cd = (client_data*)context;
600  int rc;
601 
602  MyLog(LOGA_DEBUG, "In connect onSuccess callback, \"%s\"", cd->clientid);
604  opts.context = cd;
605 
606  rc = MQTTAsync_subscribe(cd->c, cd->test_topic, 2, &opts);
607  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
608  if (rc != MQTTASYNC_SUCCESS)
609  test_finished++;
610 }
611 
612 
614 {
615  client_data* cd = (client_data*)context;
617 
618  assert("Should have connected", 0, "failed to connect", NULL);
619  MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response->code);
620  if (response->message)
621  MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\"\n", response->message);
622 
623  test_finished++;
624 }
625 
626 
627 /*********************************************************************
628 
629 Test3: More than one client object - simultaneous working.
630 
631 *********************************************************************/
632 int test3(struct Options options)
633 {
634  #define TEST3_CLIENTS 10
636  int subsqos = 2;
639  int rc = 0;
640  int i;
641  client_data clientdata[TEST3_CLIENTS];
642 
643  test_finished = 0;
644  MyLog(LOGA_INFO, "Starting test 3 - multiple connections");
645 
646  for (i = 0; i < num_clients; ++i)
647  {
648  sprintf(clientdata[i].clientid, "async_test3_num_%d", i);
649  sprintf(clientdata[i].test_topic, "async test3 topic num %d", i);
650  clientdata[i].index = i;
651  clientdata[i].message_count = 0;
652 
653  rc = MQTTAsync_create(&(clientdata[i].c), options.connection, clientdata[i].clientid,
655  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
656 
657  rc = MQTTAsync_setCallbacks(clientdata[i].c, &clientdata[i], NULL, test3_messageArrived, NULL);
658  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
659 
660  opts.keepAliveInterval = 20;
661  opts.cleansession = 1;
662  opts.username = "testuser";
663  opts.password = "testpassword";
664 
665  opts.will = &wopts;
666  opts.will->message = "will message";
667  opts.will->qos = 1;
668  opts.will->retained = 0;
669  opts.will->topicName = "will topic";
670  opts.onSuccess = test3_onConnect;
671  opts.onFailure = test3_onFailure;
672  opts.context = &clientdata[i];
673 
674  MyLog(LOGA_DEBUG, "Connecting");
675  rc = MQTTAsync_connect(clientdata[i].c, &opts);
676  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
677  }
678 
679  while (test_finished < num_clients)
680  {
681  MyLog(LOGA_DEBUG, "num_clients %d test_finished %d\n", num_clients, test_finished);
682  #if defined(_WIN32)
683  Sleep(100);
684  #else
685  usleep(10000L);
686  #endif
687  }
688 
689  MyLog(LOGA_DEBUG, "TEST3: destroying clients");
690 
691  for (i = 0; i < num_clients; ++i)
692  MQTTAsync_destroy(&clientdata[i].c);
693 
694 /*exit:*/
695  MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
696  (failures == 0) ? "passed" : "failed", tests, failures);
697 
698  return failures;
699 }
700 
701 
702 void* test4_payload = NULL;
704 
706 {
707  MQTTAsync c = (MQTTAsync)context;
708 
709  MyLog(LOGA_DEBUG, "In publish onSuccess callback, context %p", context);
710 }
711 
712 int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
713 {
714  MQTTAsync c = (MQTTAsync)context;
715  static int message_count = 0;
716  int rc, i;
717 
718  MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
719 
720  assert("Message size correct", message->payloadlen == test4_payloadlen,
721  "message size was %d", message->payloadlen);
722 
723  for (i = 0; i < options.size; ++i)
724  {
725  if (((char*)test4_payload)[i] != ((char*)message->payload)[i])
726  {
727  assert("Message contents correct", ((char*)test4_payload)[i] != ((char*)message->payload)[i],
728  "message content was %c", ((char*)message->payload)[i]);
729  break;
730  }
731  }
732 
733  if (++message_count == 1)
734  {
737 
738  pubmsg.payload = test4_payload;
739  pubmsg.payloadlen = test4_payloadlen;
740  pubmsg.qos = 1;
741  pubmsg.retained = 0;
742  opts.onSuccess = test4_onPublish;
743  opts.context = c;
744 
745  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
746  }
747  else if (message_count == 2)
748  {
751 
752  pubmsg.payload = test4_payload;
753  pubmsg.payloadlen = test4_payloadlen;
754  pubmsg.qos = 0;
755  pubmsg.retained = 0;
756  opts.onSuccess = test4_onPublish;
757  opts.context = c;
758  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
759  }
760  else
761  {
763 
765  opts.context = c;
766  rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
767  assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
768  }
769 
770  MQTTAsync_freeMessage(&message);
771  MQTTAsync_free(topicName);
772 
773  return 1;
774 }
775 
776 
778 {
779  MQTTAsync c = (MQTTAsync)context;
781  int rc, i;
782 
783  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", c);
784 
787 
788  srand(33);
789  for (i = 0; i < options.size; ++i)
790  ((char*)pubmsg.payload)[i] = rand() % 256;
791 
792  pubmsg.qos = 2;
793  pubmsg.retained = 0;
794 
795  rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
796  assert("Send successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
797 }
798 
799 
801 {
802  MQTTAsync c = (MQTTAsync)context;
804  int rc;
805 
806  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
808  opts.context = c;
809 
810  rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
811  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
812  if (rc != MQTTASYNC_SUCCESS)
813  test_finished = 1;
814 }
815 
816 
817 /*********************************************************************
818 
819 Test4: Send and receive big messages
820 
821 *********************************************************************/
822 int test4(struct Options options)
823 {
824  int subsqos = 2;
825  MQTTAsync c;
828  int rc = 0;
829  char* test_topic = "C client test4";
830 
831  test_finished = failures = 0;
832  MyLog(LOGA_INFO, "Starting test 4 - big messages");
833 
834  rc = MQTTAsync_create(&c, options.connection, "async_test_4",
836  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
837  if (rc != MQTTASYNC_SUCCESS)
838  {
839  MQTTAsync_destroy(&c);
840  goto exit;
841  }
842 
843  rc = MQTTAsync_setCallbacks(c, c, NULL, test4_messageArrived, NULL);
844  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
845 
846  opts.keepAliveInterval = 20;
847  opts.cleansession = 1;
848  opts.username = "testuser";
849  opts.password = "testpassword";
850 
851  opts.will = &wopts;
852  opts.will->message = "will message";
853  opts.will->qos = 1;
854  opts.will->retained = 0;
855  opts.will->topicName = "will topic";
856  opts.will = NULL;
857  opts.onSuccess = test4_onConnect;
858  opts.onFailure = NULL;
859  opts.context = c;
860 
861  MyLog(LOGA_DEBUG, "Connecting");
862  rc = MQTTAsync_connect(c, &opts);
863  rc = 0;
864  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
865  if (rc != MQTTASYNC_SUCCESS)
866  goto exit;
867 
868  while (!test_finished)
869  #if defined(_WIN32)
870  Sleep(100);
871  #else
872  usleep(1000L);
873  #endif
874 
875  MQTTAsync_destroy(&c);
876 
877 exit:
878  MyLog(LOGA_INFO, "TEST4: test %s. %d tests run, %d failures.",
879  (failures == 0) ? "passed" : "failed", tests, failures);
880 
881  return failures;
882 }
883 
884 
887 
889 {
890  MQTTAsync c = (MQTTAsync)context;
891 
892  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
893 
895  test_finished = 1;
896 }
897 
899 {
900  MQTTAsync c = (MQTTAsync)context;
901 
902  MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
903 
905  test_finished = 1;
906 }
907 
908 /*********************************************************************
909 
910 Test5a: All HA connections out of service.
911 
912 *********************************************************************/
913 int test5a(struct Options options)
914 {
915  MQTTAsync c;
917  int rc = 0;
918  char* test_topic = "C client test5a";
919  char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", "tcp://localhost:1882"};
920 
921  failures = 0;
922  MyLog(LOGA_INFO, "Starting test 5a - All HA connections out of service");
923 
924  rc = MQTTAsync_create(&c, "rubbish", "all_ha_down",
926  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
927  if (rc != MQTTASYNC_SUCCESS)
928  {
929  MQTTAsync_destroy(&c);
930  goto exit;
931  }
932 
933  opts.keepAliveInterval = 20;
934  opts.cleansession = 1;
935  opts.username = "testuser";
936  opts.password = "testpassword";
937 
938  opts.onSuccess = test5_onConnect;
940  opts.context = c;
941  opts.serverURIcount = 3;
942  opts.serverURIs = serverURIs;
943 
944  MyLog(LOGA_DEBUG, "Connecting");
945  rc = MQTTAsync_connect(c, &opts);
946  rc = 0;
947  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
948  if (rc != MQTTASYNC_SUCCESS)
949  goto exit;
950 
951  while (!test_finished)
952  #if defined(_WIN32)
953  Sleep(100);
954  #else
955  usleep(10000L);
956  #endif
957 
958  MQTTAsync_destroy(&c);
959 
960 exit:
961  assert("Connect onFailure should be called once", test5_onFailure_called == 1,
962  "connect onFailure was called %d times", test5_onFailure_called);
963 
964  MyLog(LOGA_INFO, "TEST5a: test %s. %d tests run, %d failures.",
965  (failures == 0) ? "passed" : "failed", tests, failures);
966 
967  return failures;
968 }
969 
970 /*********************************************************************
971 
972 Test5b: All HA connections out of service except the last one.
973 
974 *********************************************************************/
975 int test5b(struct Options options)
976 {
977  MQTTAsync c;
979  int rc = 0;
980  char* test_topic = "C client test5b";
981  char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", options.connection};
982 
983  failures = 0;
984  MyLog(LOGA_INFO, "Starting test 5b - All HA connections out of service except the last one");
985 
986  rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_last_one",
988  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
989  if (rc != MQTTASYNC_SUCCESS)
990  {
991  MQTTAsync_destroy(&c);
992  goto exit;
993  }
994 
995  opts.keepAliveInterval = 20;
996  opts.cleansession = 1;
997  opts.username = "testuser";
998  opts.password = "testpassword";
999 
1000  opts.onSuccess = test5_onConnect;
1002  opts.context = c;
1003  opts.serverURIcount = 3;
1004  opts.serverURIs = serverURIs;
1005 
1006  MyLog(LOGA_DEBUG, "Connecting");
1007  rc = MQTTAsync_connect(c, &opts);
1008  rc = 0;
1009  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1010  if (rc != MQTTASYNC_SUCCESS)
1011  goto exit;
1012 
1013  while (!test_finished)
1014  #if defined(_WIN32)
1015  Sleep(100);
1016  #else
1017  usleep(10000L);
1018  #endif
1019 
1020  MQTTAsync_destroy(&c);
1021 
1022 exit:
1023  assert("Connect onConnect should be called once", test5_onConnect_called == 1,
1024  "connect onConnect was called %d times", test5_onConnect_called);
1025 
1026  MyLog(LOGA_INFO, "TEST5b: test %s. %d tests run, %d failures.",
1027  (failures == 0) ? "passed" : "failed", tests, failures);
1028 
1029  return failures;
1030 }
1031 
1032 /*********************************************************************
1033 
1034 Test5c: All HA connections out of service except the first one.
1035 
1036 *********************************************************************/
1038 {
1039  MQTTAsync c;
1041  int rc = 0;
1042  char* test_topic = "C client test5c";
1043  char* serverURIs[3] = {options.connection, "tcp://localhost:1881", "tcp://localhost:1882"};
1044 
1045  failures = 0;
1046  MyLog(LOGA_INFO, "Starting test 5c - All HA connections out of service except the first one");
1047 
1048  rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_first_one",
1050  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
1051  if (rc != MQTTASYNC_SUCCESS)
1052  {
1053  MQTTAsync_destroy(&c);
1054  goto exit;
1055  }
1056 
1057  opts.keepAliveInterval = 20;
1058  opts.cleansession = 1;
1059  opts.username = "testuser";
1060  opts.password = "testpassword";
1061 
1062  opts.onSuccess = test5_onConnect;
1064  opts.context = c;
1065  opts.serverURIcount = 3;
1066  opts.serverURIs = serverURIs;
1067 
1068  MyLog(LOGA_DEBUG, "Connecting");
1069  rc = MQTTAsync_connect(c, &opts);
1070  rc = 0;
1071  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1072  if (rc != MQTTASYNC_SUCCESS)
1073  goto exit;
1074 
1075  while (!test_finished)
1076  #if defined(_WIN32)
1077  Sleep(100);
1078  #else
1079  usleep(10000L);
1080  #endif
1081 
1082  MQTTAsync_destroy(&c);
1083 
1084 exit:
1085  assert("Connect onConnect should be called once", test5_onConnect_called == 1,
1086  "connect onConnect was called %d times", test5_onConnect_called);
1087 
1088  MyLog(LOGA_INFO, "TEST5c: test %s. %d tests run, %d failures.",
1089  (failures == 0) ? "passed" : "failed", tests, failures);
1090 
1091  return failures;
1092 }
1093 
1094 
1095 void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
1096 {
1097  if (strstr(message, "onnect") && !strstr(message, "isconnect"))
1098  printf("Trace : %d, %s\n", level, message);
1099 }
1100 
1101 
1102 int main(int argc, char** argv)
1103 {
1104  int rc = 0;
1105  int (*tests[])() = {NULL, test1, test2, test3, test4, test5a, test5b, test5c}; /* indexed starting from 1 */
1106  MQTTAsync_nameValue* info;
1107 
1108  getopts(argc, argv);
1109 
1111 
1112  info = MQTTAsync_getVersionInfo();
1113 
1114  while (info->name)
1115  {
1116  MyLog(LOGA_INFO, "%s: %s", info->name, info->value);
1117  info++;
1118  }
1119 
1120  if (options.test_no == -1)
1121  { /* run all the tests */
1123  {
1124  failures = 0;
1126  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
1127  }
1128  }
1129  else
1130  {
1132  rc = tests[options.test_no](options); /* run just the selected test */
1133  }
1134 
1135  if (failures == 0)
1136  MyLog(LOGA_INFO, "verdict pass");
1137  else
1138  MyLog(LOGA_INFO, "verdict fail");
1139 
1140  return rc;
1141 }
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
char * test_topic
Definition: test8.c:223
int test2(struct Options options)
Definition: test8.c:424
#define ARRAY_SIZE(a)
Definition: test8.c:36
enum MQTTPropertyCodes value
int size
Definition: test11.c:53
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
START_TIME_TYPE start_clock(void)
Definition: test8.c:156
int main(int argc, char **argv)
Definition: test8.c:1102
const char * name
Definition: MQTTAsync.h:1149
union MQTTAsync_successData::@46 alt
void test1_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:226
const char * message
Definition: MQTTAsync.h:996
char * connection
const char * topicName
Definition: MQTTAsync.h:994
char test_topic[100]
Definition: test4.c:528
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
int test5_onFailure_called
Definition: test8.c:886
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
struct Options options
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
#define LOGA_INFO
Definition: test8.c:93
struct pubsub_opts opts
Definition: paho_c_pub.c:42
int test5a(struct Options options)
Definition: test8.c:913
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4209
void test3_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test8.c:578
void test3_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test8.c:613
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
void test5_onConnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:888
void test2_onConnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:409
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
int test4(struct Options options)
Definition: test8.c:822
static char msg_buf[512]
Definition: Log.c:122
void test1_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test8.c:283
int test5b(struct Options options)
Definition: test8.c:975
void test1_onConnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:300
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
long elapsed(START_TIME_TYPE start_time)
Definition: test8.c:181
const char * message
Definition: MQTTAsync.h:518
void usage(void)
Definition: test8.c:38
int test3_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test8.c:532
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
void test5_onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test8.c:898
void test1_onUnsubscribe(void *context, MQTTAsync_successData *response)
Definition: test8.c:234
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
void test4_onConnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:800
int test5c(struct Options options)
Definition: test8.c:1037
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
Definition: test8.c:203
void getopts(int argc, char **argv)
Definition: test8.c:58
description
Definition: setup.py:19
#define TEST3_CLIENTS
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
void test3_onUnsubscribe(void *context, MQTTAsync_successData *response)
Definition: test8.c:517
int message_count
Definition: test5.c:72
#define LOGA_DEBUG
Definition: test8.c:92
char clientid[24]
Definition: test4.c:527
int test3(struct Options options)
Definition: test8.c:632
int failures
Definition: test8.c:200
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: test8.c:1095
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
int test5_onConnect_called
Definition: test8.c:885
int index
Definition: test4.c:526
#define MQTTCLIENT_PERSISTENCE_DEFAULT
#define START_TIME_TYPE
Definition: test8.c:154
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
void test4_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test8.c:777
void test3_onConnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:596
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
const char * value
Definition: MQTTAsync.h:1150
void MyLog(int LOGA_level, char *format,...)
Definition: test8.c:97
void test3_onDisconnect(void *context, MQTTAsync_successData *response)
Definition: test8.c:502
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
MQTTClient c
Definition: test10.c:1656
dictionary context
Definition: test2.py:57
char * clientid
Definition: test6.c:54
void test1_onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test8.c:317
#define num_clients
int test1_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test8.c:249
null localtime_s(...)
Definition: chrono.h:286
#define MQTTCLIENT_PERSISTENCE_NONE
int message_count
Definition: test4.c:529
int test4_payloadlen
Definition: test8.c:703
int test4_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: test8.c:712
void test3_onPublish(void *context, MQTTAsync_successData *response)
Definition: test8.c:510
void test2_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test8.c:400
char *const * serverURIs
Definition: MQTTAsync.h:1277
#define assert(a, b, c, d)
Definition: test8.c:195
START_TIME_TYPE global_start_time
Definition: test8.c:192
MQTTAsync c
Definition: test4.c:525
void test4_onPublish(void *context, MQTTAsync_successData *response)
Definition: test8.c:705
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
enum MQTTReasonCodes rc
Definition: test10.c:1112
int tests
Definition: test8.c:199
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
volatile int test_finished
Definition: test8.c:221
int test1(struct Options options)
Definition: test8.c:333
int test_no
Definition: test1.c:54
int test2_onFailure_called
Definition: test8.c:398
void * test4_payload
Definition: test8.c:702
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48