sync_client_test.c
Go to the documentation of this file.
1 /*******************************************************************
2  Copyright (c) 2013, 2014 IBM Corp.
3 
4  All rights reserved. This program and the accompanying materials
5  are made available under the terms of the Eclipse Public License v2.0
6  and Eclipse Distribution License v1.0 which accompany this distribution.
7 
8  The Eclipse Public License is available at
9  https://www.eclipse.org/legal/epl-2.0/
10  and the Eclipse Distribution License is available at
11  http://www.eclipse.org/org/documents/edl-v10.php.
12 
13  Contributors:
14  Ian Craggs - initial implementation and/or documentation
15 *******************************************************************/
16 
17 #include "MQTTClient.h"
18 #include <string.h>
19 #include <stdlib.h>
20 
21 #if !defined(_WINDOWS)
22  #include <sys/time.h>
23  #include <sys/socket.h>
24  #include <unistd.h>
25  #include <errno.h>
26 #else
27 #include <winsock2.h>
28 #include <ws2tcpip.h>
29 #define MAXHOSTNAMELEN 256
30 #define EAGAIN WSAEWOULDBLOCK
31 #define EINTR WSAEINTR
32 #define EINPROGRESS WSAEINPROGRESS
33 #define EWOULDBLOCK WSAEWOULDBLOCK
34 #define ENOTCONN WSAENOTCONN
35 #define ECONNRESET WSAECONNRESET
36 #endif
37 
38 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
39 
40 
41 char* topics[] = {"TopicA", "TopicA/B", "Topic/C", "TopicA/C", "/TopicA"};
42 char* wildtopics[] = {"TopicA/+", "+/C", "#", "/#", "/+", "+/+", "TopicA/#"};
43 char* nosubscribe_topics[] = {"nosubscribe",};
44 
45 struct Options
46 {
47  char* connection;
48  char* clientid1;
49  char* clientid2;
50  char* username;
51  char* password;
52  int verbose;
57 } options =
58 {
59  "tcp://localhost:1883",
60  "myclientid",
61  "myclientid2",
62  NULL,
63  NULL,
64  0,
66  1,
67  0,
68  0,
69 };
70 
71 
72 void usage(void)
73 {
74  printf("options:\n connection, clientid1, clientid2, username, password, MQTTversion, iterations, verbose\n");
75  exit(EXIT_FAILURE);
76 }
77 
78 void getopts(int argc, char** argv)
79 {
80  int count = 1;
81 
82  while (count < argc)
83  {
84  if (strcmp(argv[count], "--dollar_topics_test") == 0 || strcmp(argv[count], "--$") == 0)
85  {
87  printf("Running $ topics test\n");
88  }
89  else if (strcmp(argv[count], "--subscribe_failure_test") == 0 || strcmp(argv[count], "-s") == 0)
90  {
92  printf("Running subscribe failure test\n");
93  }
94  else if (strcmp(argv[count], "--connection") == 0)
95  {
96  if (++count < argc)
97  {
98  options.connection = argv[count];
99  printf("Setting connection to %s\n", options.connection);
100  }
101  else
102  usage();
103  }
104  else if (strcmp(argv[count], "--clientid1") == 0)
105  {
106  if (++count < argc)
107  {
108  options.clientid1 = argv[count];
109  printf("Setting clientid1 to %s\n", options.clientid1);
110  }
111  else
112  usage();
113  }
114  else if (strcmp(argv[count], "--clientid2") == 0)
115  {
116  if (++count < argc)
117  {
118  options.clientid2 = argv[count];
119  printf("Setting clientid2 to %s\n", options.clientid2);
120  }
121  else
122  usage();
123  }
124  else if (strcmp(argv[count], "--username") == 0)
125  {
126  if (++count < argc)
127  {
128  options.username = argv[count];
129  printf("Setting username to %s\n", options.username);
130  }
131  else
132  usage();
133  }
134  else if (strcmp(argv[count], "--password") == 0)
135  {
136  if (++count < argc)
137  {
138  options.password = argv[count];
139  printf("Setting password to %s\n", options.password);
140  }
141  else
142  usage();
143  }
144  else if (strcmp(argv[count], "--MQTTversion") == 0)
145  {
146  if (++count < argc)
147  {
148  options.MQTTVersion = atoi(argv[count]);
149  printf("Setting MQTT version to %d\n", options.MQTTVersion);
150  }
151  else
152  usage();
153  }
154  else if (strcmp(argv[count], "--iterations") == 0)
155  {
156  if (++count < argc)
157  {
158  options.iterations = atoi(argv[count]);
159  printf("Setting iterations to %d\n", options.iterations);
160  }
161  else
162  usage();
163  }
164  else if (strcmp(argv[count], "--verbose") == 0)
165  {
166  options.verbose = 1;
167  printf("\nSetting verbose on\n");
168  }
169  count++;
170  }
171 }
172 
173 
174 #if defined(_WIN32) || defined(_WINDOWS)
175 #define msleep Sleep
176 #define START_TIME_TYPE DWORD
177 static DWORD start_time = 0;
179 {
180  return GetTickCount();
181 }
182 #elif defined(AIX)
183 #define mqsleep sleep
184 #define START_TIME_TYPE struct timespec
186 {
187  static struct timespec start;
188  clock_gettime(CLOCK_REALTIME, &start);
189  return start;
190 }
191 #else
192 #define msleep(A) usleep(A*1000)
193 #define START_TIME_TYPE struct timeval
194 /* TODO - unused - remove? static struct timeval start_time; */
196 {
197  struct timeval start_time;
198  gettimeofday(&start_time, NULL);
199  return start_time;
200 }
201 #endif
202 
203 #define LOGA_DEBUG 0
204 #define LOGA_INFO 1
205 #include <stdarg.h>
206 #include <time.h>
207 #include <sys/timeb.h>
208 void MyLog(int LOGA_level, char* format, ...)
209 {
210  static char msg_buf[256];
211  va_list args;
212  struct timeb ts;
213 
214  struct tm *timeinfo;
215 
216  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
217  return;
218 
219  ftime(&ts);
220  timeinfo = localtime(&ts.time);
221  strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
222 
223  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
224 
225  va_start(args, format);
226  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
227  va_end(args);
228 
229  printf("%s\n", msg_buf);
230  fflush(stdout);
231 }
232 
233 
234 int tests = 0;
235 int failures = 0;
236 
237 
238 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
239 {
240  ++tests;
241  if (!value)
242  {
243  int count;
244  va_list args;
245 
246  ++failures;
247  printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
248 
249  va_start(args, format);
250  count = vprintf(format, args);
251  va_end(args);
252  if (count)
253  printf("\n");
254 
255  //cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
256  // description, filename, lineno);
257  }
258  else
259  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
260 }
261 
262 
263 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
264 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
265 
266 typedef struct
267 {
268  char* topicName;
269  int topicLen;
271 } messageStruct;
272 
274 int messageCount = 0;
275 
276 int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
277 {
278  messagesArrived[messageCount].topicName = topicName;
279  messagesArrived[messageCount].topicLen = topicLen;
280  messagesArrived[messageCount++].m = m;
281  MyLog(LOGA_DEBUG, "Callback: %d message received on topic %s is %.*s.",
282  messageCount, topicName, m->payloadlen, (char*)(m->payload));
283  return 1;
284 }
285 
286 
287 void clearMessages(void)
288 {
289  int i;
290 
291  for (i = 0; i < messageCount; ++i)
292  {
293  MQTTClient_free(messagesArrived[i].topicName);
294  MQTTClient_freeMessage(&messagesArrived[i].m);
295  }
296  messageCount = 0;
297 }
298 
299 void cleanup(void)
300 {
301  // clean all client state
302  char* clientids[] = {options.clientid1, options.clientid2};
303  int i, rc;
305  MQTTClient aclient;
306 
307  MyLog(LOGA_INFO, "Cleaning up");
308 
309  opts.keepAliveInterval = 20;
310  opts.cleansession = 1;
311  opts.username = options.username;
312  opts.password = options.password;
314 
315  for (i = 0; i < 2; ++i)
316  {
317  rc = MQTTClient_create(&aclient, options.connection, clientids[i], MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
318  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
319 
320  rc = MQTTClient_connect(aclient, &opts);
321  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
322 
323  rc = MQTTClient_disconnect(aclient, 100);
324  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
325 
326  MQTTClient_destroy(&aclient);
327  }
328 
329  // clean retained messages
331  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
332 
333  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
334  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
335 
336  rc = MQTTClient_connect(aclient, &opts);
337  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
338 
339  rc = MQTTClient_subscribe(aclient, "#", 0);
340  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
341 
342  msleep(2000); // wait for all retained messages to arrive
343 
344  rc = MQTTClient_unsubscribe(aclient, "#");
345  assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
346 
347  for (i = 0; i < messageCount; ++i)
348  {
349  if (messagesArrived[i].m->retained)
350  {
351  MyLog(LOGA_INFO, "Deleting retained message for topic %s", (char*)messagesArrived[i].topicName);
352  rc = MQTTClient_publish(aclient, messagesArrived[i].topicName, 0, "", 0, 1, NULL);
353  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
354  }
355  }
356 
357  rc = MQTTClient_disconnect(aclient, 100);
358  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
359 
360  MQTTClient_destroy(&aclient);
361 
362  clearMessages();
363 
364  MyLog(LOGA_INFO, "Finished cleaning up");
365 }
366 
367 
368 int basic_test(void)
369 {
370  int i, rc;
372  MQTTClient aclient;
373 
374  MyLog(LOGA_INFO, "Starting basic test");
375 
376  tests = failures = 0;
377 
378  opts.keepAliveInterval = 20;
379  opts.cleansession = 1;
380  opts.username = options.username;
381  opts.password = options.password;
383 
385  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
386 
387  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
388  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
389 
390  rc = MQTTClient_connect(aclient, &opts);
391  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
392 
393  rc = MQTTClient_disconnect(aclient, 100);
394  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
395 
396  rc = MQTTClient_connect(aclient, &opts);
397  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
398 
399  rc = MQTTClient_subscribe(aclient, topics[0], 0);
400  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
401 
402  rc = MQTTClient_publish(aclient, topics[0], 5, "qos 0", 0, 0, NULL);
403  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
404 
405  rc = MQTTClient_publish(aclient, topics[0], 5, "qos 1", 1, 0, NULL);
406  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
407 
408  rc = MQTTClient_publish(aclient, topics[0], 5, "qos 2", 2, 0, NULL);
409  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
410 
411  msleep(1000);
412 
413  rc = MQTTClient_disconnect(aclient, 10000);
414  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
415 
416  assert("3 Messages received", messageCount == 3, "messageCount was %d", messageCount);
417  clearMessages();
418 
419  /*opts.MQTTVersion = MQTTVERSION_3_1;
420  rc = MQTTClient_connect(aclient, &opts); // should fail - wrong protocol version
421  assert("Bad rc from connect", rc == MQTTCLIENT_FAILURE, "rc was %d", rc);*/
422 
423  MQTTClient_destroy(&aclient);
424 
425  MyLog(LOGA_INFO, "Basic test %s", (failures == 0) ? "succeeded" : "failed");
426  return failures;
427 }
428 
429 
430 
432 {
433  int i, rc;
435  MQTTClient aclient;
436  MQTTClient bclient;
437 
438  MyLog(LOGA_INFO, "Offline message queueing test");
439 
440  tests = failures = 0;
441 
442  opts.keepAliveInterval = 20;
443  opts.cleansession = 0;
444  opts.username = options.username;
445  opts.password = options.password;
447 
449  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
450 
451  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
452  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
453 
454  rc = MQTTClient_connect(aclient, &opts);
455  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
456 
457  rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
458  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
459 
460  rc = MQTTClient_disconnect(aclient, 100);
461  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
462 
464  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
465 
466  opts.cleansession = 1;
467  rc = MQTTClient_connect(bclient, &opts);
468  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
469 
470  rc = MQTTClient_publish(bclient, topics[1], 5, "qos 0", 0, 0, NULL);
471  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
472 
473  rc = MQTTClient_publish(bclient, topics[2], 5, "qos 1", 1, 0, NULL);
474  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
475 
476  rc = MQTTClient_publish(bclient, topics[3], 5, "qos 2", 2, 0, NULL);
477  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
478 
479  msleep(2000);
480 
481  rc = MQTTClient_disconnect(bclient, 100);
482  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
483 
484  MQTTClient_destroy(&bclient);
485 
486  opts.cleansession = 0;
487  rc = MQTTClient_connect(aclient, &opts);
488  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
489 
490  msleep(1000); // receive the queued messages
491 
492  rc = MQTTClient_disconnect(aclient, 100);
493  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
494 
495  MQTTClient_destroy(&aclient);
496 
497  assert("2 or 3 messages received", messageCount == 3 || messageCount == 2, "messageCount was %d", messageCount);
498 
499  MyLog(LOGA_INFO, "This server %s queueing QoS 0 messages for offline clients", (messageCount == 3) ? "is" : "is not");
500 
501  clearMessages();
502 
503  MyLog(LOGA_INFO, "Offline message queueing test %s", (failures == 0) ? "succeeded" : "failed");
504  return failures;
505 }
506 
507 
509 {
510  int i, rc;
512  MQTTClient aclient;
513 
514  MyLog(LOGA_INFO, "Retained message test");
515 
516  tests = failures = 0;
517 
518  opts.keepAliveInterval = 20;
519  opts.cleansession = 1;
520  opts.username = options.username;
521  opts.password = options.password;
523 
524  assert("0 messages received", messageCount == 0, "messageCount was %d", messageCount);
525 
526  // set retained messages
528  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
529 
530  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
531  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
532 
533  rc = MQTTClient_connect(aclient, &opts);
534  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
535 
536  rc = MQTTClient_publish(aclient, topics[1], 5, "qos 0", 0, 1, NULL);
537  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
538 
539  rc = MQTTClient_publish(aclient, topics[2], 5, "qos 1", 1, 1, NULL);
540  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
541 
542  rc = MQTTClient_publish(aclient, topics[3], 5, "qos 2", 2, 1, NULL);
543  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
544 
545  msleep(1000);
546 
547  rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
548  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
549 
550  msleep(2000);
551 
552  rc = MQTTClient_disconnect(aclient, 100);
553  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
554 
555  assert("3 messages received", messageCount == 3, "messageCount was %d", messageCount);
556 
557  for (i = 0; i < messageCount; ++i)
558  {
559  assert("messages should be retained", messagesArrived[i].m->retained, "retained was %d",
560  messagesArrived[i].m->retained);
561  MQTTClient_free(messagesArrived[i].topicName);
562  MQTTClient_freeMessage(&messagesArrived[i].m);
563  }
564  messageCount = 0;
565 
566  // clear retained messages
567  rc = MQTTClient_connect(aclient, &opts);
568  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
569 
570  rc = MQTTClient_publish(aclient, topics[1], 0, "", 0, 1, NULL);
571  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
572 
573  rc = MQTTClient_publish(aclient, topics[2], 0, "", 1, 1, NULL);
574  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
575 
576  rc = MQTTClient_publish(aclient, topics[3], 0, "", 2, 1, NULL);
577  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
578 
579  msleep(200); // wait for QoS 2 exchange to be completed
580  rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
581  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
582 
583  msleep(200);
584 
585  rc = MQTTClient_disconnect(aclient, 100);
586  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
587 
588  assert("0 messages received", messageCount == 0, "messageCount was %d", messageCount);
589 
590  MQTTClient_destroy(&aclient);
591 
592  MyLog(LOGA_INFO, "Retained message test %s", (failures == 0) ? "succeeded" : "failed");
593  return failures;
594 }
595 
596 #define SOCKET_ERROR -1
597 
598 int test6_socket_error(char* aString, int sock)
599 {
600 #if defined(_WIN32)
601  int errno;
602 #endif
603 
604 #if defined(_WIN32)
605  errno = WSAGetLastError();
606 #endif
607  if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
608  {
609  if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
610  printf("Socket error %d in %s for socket %d", errno, aString, sock);
611  }
612  return errno;
613 }
614 
616 {
617  int rc;
618 
619 #if defined(_WIN32)
620  if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
621  test6_socket_error("shutdown", socket);
622  if ((rc = closesocket(socket)) == SOCKET_ERROR)
623  test6_socket_error("close", socket);
624 #else
625  if (shutdown(socket, SHUT_RDWR) == SOCKET_ERROR)
626  test6_socket_error("shutdown", socket);
627  if ((rc = close(socket)) == SOCKET_ERROR)
628  test6_socket_error("close", socket);
629 #endif
630  return rc;
631 }
632 
633 typedef struct
634 {
635  int socket;
636  time_t lastContact;
637 #if defined(OPENSSL)
638  SSL* ssl;
639  SSL_CTX* ctx;
640 #endif
642 
643 
644 typedef struct
645 {
646  char* clientID;
647  char* username;
648  char* password;
649  unsigned int cleansession : 1;
650  unsigned int connected : 1;
651  unsigned int good : 1;
652  unsigned int ping_outstanding : 1;
653  int connect_state : 4;
654  networkHandles net;
655 /* ... */
656 } Clients;
657 
658 
659 typedef struct
660 {
661  char* serverURI;
662  Clients* c;
666  void* context;
667 
669  int rc; /* getsockopt return code in connect */
673  void* pack;
674 } MQTTClients;
675 
676 
678 {
679  int i, rc, count = 0;
682  MQTTClient aclient, bclient;
683 
684  MyLog(LOGA_INFO, "Will message test");
685 
686  tests = failures = 0;
687 
688  opts.keepAliveInterval = 2;
689  opts.cleansession = 1;
690  opts.username = options.username;
691  opts.password = options.password;
693 
694  opts.will = &wopts;
695  opts.will->message = "client not disconnected";
696  opts.will->qos = 1;
697  opts.will->retained = 0;
698  opts.will->topicName = topics[2];
699 
701  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
702 
703  rc = MQTTClient_connect(aclient, &opts);
704  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
705 
707  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
708 
709  rc = MQTTClient_setCallbacks(bclient, NULL, NULL, messageArrived, NULL);
710  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
711 
712  opts.keepAliveInterval = 20;
713  opts.will = NULL;
714  rc = MQTTClient_connect(bclient, &opts);
715  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
716 
717  rc = MQTTClient_subscribe(bclient, topics[2], 2);
718  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
719 
720  msleep(100);
721 
722  test6_socket_close(((MQTTClients*)aclient)->c->net.socket);
723 
724  while (messageCount == 0 && ++count < 10)
725  msleep(1000);
726 
727  rc = MQTTClient_disconnect(bclient, 100);
728  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
729 
730  MQTTClient_destroy(&bclient);
731 
732  assert("will message received", messageCount == 1, "messageCount was %d", messageCount);
733 
734  rc = MQTTClient_disconnect(aclient, 100);
735 
736  MQTTClient_destroy(&aclient);
737 
738  MyLog(LOGA_INFO, "Will message test %s", (failures == 0) ? "succeeded" : "failed");
739  return failures;
740 }
741 
742 
744 {
745  /* overlapping subscriptions. When there is more than one matching subscription for the same client for a topic,
746  the server may send back one message with the highest QoS of any matching subscription, or one message for
747  each subscription with a matching QoS. */
748 
749  int i, rc;
751  MQTTClient aclient;
752  char* topicList[] = {wildtopics[6], wildtopics[0]};
753  int qosList[] = {2, 1};
754 
755  MyLog(LOGA_INFO, "Starting overlapping subscriptions test");
756 
757  clearMessages();
758  tests = failures = 0;
759 
760  opts.keepAliveInterval = 20;
761  opts.cleansession = 1;
762  opts.username = options.username;
763  opts.password = options.password;
765 
767  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
768 
769  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
770  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
771 
772  rc = MQTTClient_connect(aclient, &opts);
773  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
774 
775  rc = MQTTClient_subscribeMany(aclient, 2, topicList, qosList);
776  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
777 
778  rc = MQTTClient_publish(aclient, topics[3], strlen("overlapping topic filters") + 1,
779  "overlapping topic filters", 2, 0, NULL);
780  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
781 
782  msleep(1000);
783 
784  assert("1 or 2 messages received", messageCount == 1 || messageCount == 2, "messageCount was %d", messageCount);
785 
786  if (messageCount == 1)
787  {
788  MyLog(LOGA_INFO, "This server is publishing one message for all matching overlapping subscriptions, not one for each.");
789  assert("QoS should be 2", messagesArrived[0].m->qos == 2, "QoS was %d", messagesArrived[0].m->qos);
790  }
791  else
792  {
793  MyLog(LOGA_INFO, "This server is publishing one message per each matching overlapping subscription.");
794  assert1("QoSs should be 1 and 2",
795  (messagesArrived[0].m->qos == 2 && messagesArrived[1].m->qos == 1) ||
796  (messagesArrived[0].m->qos == 1 && messagesArrived[1].m->qos == 2),
797  "QoSs were %d %d", messagesArrived[0].m->qos, messagesArrived[1].m->qos);
798  }
799 
800  rc = MQTTClient_disconnect(aclient, 100);
801 
802  MQTTClient_destroy(&aclient);
803 
804  MyLog(LOGA_INFO, "Overlapping subscription test %s", (failures == 0) ? "succeeded" : "failed");
805  return failures;
806 }
807 
808 
809 int keepalive_test(void)
810 {
811  /* keepalive processing. We should be kicked off by the server if we don't send or receive any data, and don't send
812  any pings either. */
813 
814  int i, rc, count = 0;
817  MQTTClient aclient, bclient;
818 
819  MyLog(LOGA_INFO, "Starting keepalive test");
820 
821  tests = failures = 0;
822  clearMessages();
823 
824  opts.cleansession = 1;
825  opts.username = options.username;
826  opts.password = options.password;
828 
829  opts.will = &wopts;
830  opts.will->message = "keepalive expiry";
831  opts.will->qos = 1;
832  opts.will->retained = 0;
833  opts.will->topicName = topics[4];
834 
835  opts.keepAliveInterval = 20;
837  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
838 
839  rc = MQTTClient_setCallbacks(bclient, NULL, NULL, messageArrived, NULL);
840  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
841 
842  rc = MQTTClient_connect(bclient, &opts);
843  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
844 
845  rc = MQTTClient_subscribe(bclient, topics[4], 2);
846  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
847 
848  opts.keepAliveInterval = 2;
850  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
851 
852  rc = MQTTClient_connect(aclient, &opts);
853  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
854 
855  while (messageCount == 0 && ++count < 20)
856  msleep(1000);
857 
858  rc = MQTTClient_disconnect(bclient, 100);
859  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
860 
861  assert("Should have will message", messageCount == 1, "messageCount was %d", messageCount);
862 
863  rc = MQTTClient_disconnect(aclient, 100);
864 
865  MQTTClient_destroy(&aclient);
866 
867  MyLog(LOGA_INFO, "Keepalive test %s", (failures == 0) ? "succeeded" : "failed");
868  return failures;
869 }
870 
871 
872 
874 {
875  /* redelivery on reconnect. When a QoS 1 or 2 exchange has not been completed, the server should retry the
876  appropriate MQTT packets */
877 
878  int i, rc, count = 0;
880  MQTTClient aclient;
881 
882  MyLog(LOGA_INFO, "Starting redelivery on reconnect test");
883 
884  tests = failures = 0;
885  clearMessages();
886 
887  opts.keepAliveInterval = 0;
888  opts.cleansession = 0;
889  opts.username = options.username;
890  opts.password = options.password;
892 
894  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
895 
896  rc = MQTTClient_connect(aclient, &opts);
897  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
898 
899  rc = MQTTClient_subscribe(aclient, wildtopics[6], 2);
900  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
902 
903  // no background processing because no callback has been set
904  rc = MQTTClient_publish(aclient, topics[1], 6, "qos 1", 2, 0, NULL);
905  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
906 
907  rc = MQTTClient_publish(aclient, topics[3], 6, "qos 2", 2, 0, NULL);
908  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
909 
910  rc = MQTTClient_disconnect(aclient, 0);
911 
912  assert("No messages should have been received yet", messageCount == 0, "messageCount was %d", messageCount);
913 
914  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
915  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
916 
917  rc = MQTTClient_connect(aclient, &opts);
918  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
919 
920  while (messageCount < 2 && ++count < 5)
921  msleep(1000);
922 
923  assert("Should have 2 messages", messageCount == 2, "messageCount was %d", messageCount);
924 
925  rc = MQTTClient_disconnect(aclient, 100);
926 
927  MQTTClient_destroy(&aclient);
928 
929  MyLog(LOGA_INFO, "Redelivery on reconnect test %s", (failures == 0) ? "succeeded" : "failed");
930  return failures;
931 }
932 
933 
934 
936 {
937  int i, rc, count = 0;
939  MQTTClient aclient;
940 
941  MyLog(LOGA_INFO, "Starting zero length clientid test");
942 
943  tests = failures = 0;
944  clearMessages();
945 
946  opts.keepAliveInterval = 0;
947  opts.cleansession = 0;
948  opts.username = options.username;
949  opts.password = options.password;
951 
953  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
954 
955  rc = MQTTClient_connect(aclient, &opts);
956  assert("rc 2 from connect", rc == 2, "rc was %d", rc); // this should always fail
957 
958  opts.cleansession = 1;
959  rc = MQTTClient_connect(aclient, &opts);
960  assert("Connack rc should be 0 or 2", rc == MQTTCLIENT_SUCCESS || rc == 2, "rc was %d", rc);
961 
962  MyLog(LOGA_INFO, "This server %s support zero length clientids", (rc == 2) ? "does not" : "does");
963 
964  if (rc == MQTTCLIENT_SUCCESS)
965  rc = MQTTClient_disconnect(aclient, 100);
966 
967  MQTTClient_destroy(&aclient);
968 
969  MyLog(LOGA_INFO, "Zero length clientid test %s", (failures == 0) ? "succeeded" : "failed");
970  return failures;
971 }
972 
973 
975 {
976  /* $ topics. The specification says that a topic filter which starts with a wildcard does not match topic names that
977  begin with a $. Publishing to a topic which starts with a $ may not be allowed on some servers (which is entirely valid),
978  so this test will not work and should be omitted in that case.
979  */
980  int i, rc, count = 0;
982  MQTTClient aclient;
983  char dollartopic[20];
984 
985  MyLog(LOGA_INFO, "Starting $ topics test");
986 
987  sprintf(dollartopic, "$%s", topics[1]);
988 
989  clearMessages();
990 
991  opts.keepAliveInterval = 5;
992  opts.cleansession = 1;
993  opts.username = options.username;
994  opts.password = options.password;
996 
998  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
999 
1000  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
1001  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1002 
1003  rc = MQTTClient_connect(aclient, &opts);
1004  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1005 
1006  rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
1007  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1008 
1009  msleep(1000); // wait for any retained messages, hopefully
1010  clearMessages();
1011 
1012  rc = MQTTClient_publish(aclient, topics[1], 20, "not sent to dollar topic", 1, 0, NULL);
1013  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1014 
1015  rc = MQTTClient_publish(aclient, dollartopic, 20, "sent to dollar topic", 1, 0, NULL);
1016  assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1017 
1018  msleep(1000);
1019  assert("Should have 1 message", messageCount == 1, "messageCount was %d", messageCount);
1020 
1021  rc = MQTTClient_disconnect(aclient, 100);
1022  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1023 
1024  MQTTClient_destroy(&aclient);
1025 
1026  MyLog(LOGA_INFO, "$ topics test %s", (failures == 0) ? "succeeded" : "failed");
1027  return failures;
1028 }
1029 
1030 
1032 {
1033  /* Subscribe failure. A new feature of MQTT 3.1.1 is the ability to send back negative reponses to subscribe
1034  requests. One way of doing this is to subscribe to a topic which is not allowed to be subscribed to.
1035  */
1036  int i, rc, count = 0;
1038  MQTTClient aclient;
1039  int subqos = 2;
1040 
1041  MyLog(LOGA_INFO, "Starting subscribe failure test");
1042 
1043  clearMessages();
1044 
1045  opts.keepAliveInterval = 5;
1046  opts.cleansession = 1;
1047  opts.username = options.username;
1048  opts.password = options.password;
1050 
1052  assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
1053 
1054  rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
1055  assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1056 
1057  rc = MQTTClient_connect(aclient, &opts);
1058  assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1059 
1060  rc = MQTTClient_subscribeMany(aclient, 1, &nosubscribe_topics[0], &subqos);
1061  assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1062  assert("0x80 rc from subscribe", subqos == 0x80, "subqos was %d", subqos);
1063 
1064  rc = MQTTClient_disconnect(aclient, 100);
1065  assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
1066 
1067  MQTTClient_destroy(&aclient);
1068 
1069  MyLog(LOGA_INFO, "Subscribe failure test %s", (failures == 0) ? "succeeded" : "failed");
1070  return failures;
1071 }
1072 
1073 
1074 int main(int argc, char** argv)
1075 {
1076  int i;
1077  int all_failures = 0;
1078 
1079  getopts(argc, argv);
1080 
1081  for (i = 0; i < options.iterations; ++i)
1082  {
1083  cleanup();
1084  all_failures += basic_test() +
1087  will_message_test() +
1089  keepalive_test() +
1092 
1094  all_failures += dollar_topics_test();
1095 
1097  all_failures += subscribe_failure_test();
1098  }
1099 
1100  MyLog(LOGA_INFO, "Test suite %s", (all_failures == 0) ? "succeeded" : "failed");
1101 }
1102 
1103 
1104 
1105 
1106 
1107 
1108 
1109 
1110 
1111 
1112 
1113 
1114 
1115 
1116 
1117 
1118 
1119 
1120 
1121 
1122 
enum MQTTPropertyCodes value
int overlapping_subscriptions_test(void)
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
void getopts(int argc, char **argv)
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions *options)
Definition: MQTTClient.c:1644
int tests
int MQTTClient_publish(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2387
int MQTTClient_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: MQTTClient.h:359
char * connection
void MQTTClient_connectionLost(void *context, char *cause)
Definition: MQTTClient.h:398
int MQTTClient_disconnect(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1908
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *m)
char * password
int retained_message_test(void)
int offline_message_queueing_test(void)
int test6_socket_error(char *aString, int sock)
struct pubsub_opts opts
Definition: paho_c_pub.c:42
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
std::tm localtime(std::time_t time)
Definition: chrono.h:292
void MyLog(int LOGA_level, char *format,...)
static char msg_buf[512]
Definition: Log.c:122
#define START_TIME_TYPE
#define MQTTClient_willOptions_initializer
Definition: MQTTClient.h:639
int basic_test(void)
int MQTTClient_setCallbacks(MQTTClient handle, void *context, MQTTClient_connectionLost *cl, MQTTClient_messageArrived *ma, MQTTClient_deliveryComplete *dc)
Definition: MQTTClient.c:1032
char * nosubscribe_topics[]
constexpr size_t count()
Definition: core.h:960
description
Definition: setup.py:19
int MQTTClient_unsubscribe(MQTTClient handle, const char *topic)
Definition: MQTTClient.c:2239
void cleanup(void)
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
#define LOGA_DEBUG
char * clientid2
int messageCount
const char * topicName
Definition: MQTTClient.h:619
void MQTTClient_freeMessage(MQTTClient_message **message)
Definition: MQTTClient.c:601
int main(int argc, char **argv)
int keepalive_test(void)
char * clientid1
void * MQTTClient
Definition: MQTTClient.h:246
int subscribe_failure_test(void)
void MQTTClient_yield(void)
Definition: MQTTClient.c:2730
int zero_length_clientid_test(void)
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
void clearMessages(void)
int dollar_topics_test(void)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
void MQTTClient_deliveryComplete(void *context, MQTTClient_deliveryToken dt)
Definition: MQTTClient.h:381
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
messageStruct messagesArrived[1000]
#define assert(a, b, c, d)
MQTTClient_message * m
int MQTTClient_subscribe(MQTTClient handle, const char *topic, int qos)
Definition: MQTTClient.c:2104
char * wildtopics[]
int test6_socket_close(int socket)
#define msleep(A)
MQTTClient c
Definition: test10.c:1656
dictionary context
Definition: test2.py:57
void MQTTClient_free(void *memory)
Definition: MQTTClient.c:612
char * password
#define MQTTClient_connectOptions_initializer
Definition: MQTTClient.h:953
#define LOGA_INFO
int MQTTClient_subscribeMany(MQTTClient handle, int count, char *const *topic, int *qos)
Definition: MQTTClient.c:2075
void usage(void)
int run_subscribe_failure_test
int will_message_test(void)
char * username
char * username
#define assert1(a, b, c, d, e)
volatile int connected
int run_dollar_topics_test
char * topics[]
int redelivery_on_reconnect_test(void)
const char * message
Definition: MQTTClient.h:621
enum MQTTReasonCodes rc
Definition: test10.c:1112
int failures
#define SOCKET_ERROR
int MQTTClient_create(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTClient.c:507
MQTTClient_willOptions * will
Definition: MQTTClient.h:866
START_TIME_TYPE start_clock(void)
struct Options options


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48