test_mqtt4async.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2014 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - MQTT 3.1.1 support
16  *******************************************************************************/
17 
18 
25 /*
26 #if !defined(_RTSHEADER)
27  #include <rts.h>
28 #endif
29 */
30 
31 #include "MQTTAsync.h"
32 #include <string.h>
33 #include <stdlib.h>
34 
35 #if !defined(_WINDOWS)
36  #include <sys/time.h>
37  #include <sys/socket.h>
38  #include <unistd.h>
39  #include <errno.h>
40 #else
41 #include <winsock2.h>
42 #include <ws2tcpip.h>
43 #define MAXHOSTNAMELEN 256
44 #define EAGAIN WSAEWOULDBLOCK
45 #define EINTR WSAEINTR
46 #define EINPROGRESS WSAEINPROGRESS
47 #define EWOULDBLOCK WSAEWOULDBLOCK
48 #define ENOTCONN WSAENOTCONN
49 #define ECONNRESET WSAECONNRESET
50 #define setenv(a, b, c) _putenv_s(a, b)
51 #endif
52 
53 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
54 
55 void usage(void)
56 {
57  printf("help!!\n");
58  exit(EXIT_FAILURE);
59 }
60 
61 struct Options
62 {
63  char* connection;
64  char** haconnections;
65  int hacount;
66  int verbose;
67  int test_no;
68  int iterations;
69 } options =
70 {
71  "m2m.eclipse.org:1883",
72  NULL,
73  0,
74  0,
75  0,
76  1,
77 };
78 
79 void getopts(int argc, char** argv)
80 {
81  int count = 1;
82 
83  while (count < argc)
84  {
85  if (strcmp(argv[count], "--test_no") == 0)
86  {
87  if (++count < argc)
88  options.test_no = atoi(argv[count]);
89  else
90  usage();
91  }
92  else if (strcmp(argv[count], "--connection") == 0)
93  {
94  if (++count < argc)
95  {
96  options.connection = argv[count];
97  printf("\nSetting connection to %s\n", options.connection);
98  }
99  else
100  usage();
101  }
102  else if (strcmp(argv[count], "--haconnections") == 0)
103  {
104  if (++count < argc)
105  {
106  char* tok = strtok(argv[count], " ");
107  options.hacount = 0;
108  options.haconnections = malloc(sizeof(char*) * 5);
109  while (tok)
110  {
111  options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
112  strcpy(options.haconnections[options.hacount], tok);
113  options.hacount++;
114  tok = strtok(NULL, " ");
115  }
116  }
117  else
118  usage();
119  }
120  else if (strcmp(argv[count], "--iterations") == 0)
121  {
122  if (++count < argc)
123  options.iterations = atoi(argv[count]);
124  else
125  usage();
126  }
127  else if (strcmp(argv[count], "--verbose") == 0)
128  {
129  options.verbose = 1;
130  printf("\nSetting verbose on\n");
131  }
132  count++;
133  }
134 }
135 
136 
137 #define LOGA_DEBUG 0
138 #define LOGA_INFO 1
139 #include <stdarg.h>
140 #include <time.h>
141 #include <sys/timeb.h>
142 void MyLog(int LOGA_level, char* format, ...)
143 {
144  static char msg_buf[256];
145  va_list args;
146  struct timeb ts;
147 
148  struct tm *timeinfo;
149 
150  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
151  return;
152 
153  ftime(&ts);
154  timeinfo = localtime(&ts.time);
155  strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
156 
157  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
158 
159  va_start(args, format);
160  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
161  va_end(args);
162 
163  printf("%s\n", msg_buf);
164  fflush(stdout);
165 }
166 
167 
168 #if defined(_WIN32) || defined(_WINDOWS)
169 #define mqsleep(A) Sleep(1000*A)
170 #define START_TIME_TYPE DWORD
171 static DWORD start_time = 0;
173 {
174  return GetTickCount();
175 }
176 #elif defined(AIX)
177 #define mqsleep sleep
178 #define START_TIME_TYPE struct timespec
180 {
181  static struct timespec start;
182  clock_gettime(CLOCK_REALTIME, &start);
183  return start;
184 }
185 #else
186 #define mqsleep sleep
187 #define START_TIME_TYPE struct timeval
188 /* TODO - unused - remove? static struct timeval start_time; */
190 {
191  struct timeval start_time;
192  gettimeofday(&start_time, NULL);
193  return start_time;
194 }
195 #endif
196 
197 
198 #if defined(_WIN32)
199 long elapsed(START_TIME_TYPE start_time)
200 {
201  return GetTickCount() - start_time;
202 }
203 #elif defined(AIX)
204 #define assert(a)
205 long elapsed(struct timespec start)
206 {
207  struct timespec now, res;
208 
209  clock_gettime(CLOCK_REALTIME, &now);
210  ntimersub(now, start, res);
211  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
212 }
213 #else
214 long elapsed(START_TIME_TYPE start_time)
215 {
216  struct timeval now, res;
217 
218  gettimeofday(&now, NULL);
219  timersub(&now, &start_time, &res);
220  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
221 }
222 #endif
223 
224 
225 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
226 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
227 
228 int tests = 0;
229 int failures = 0;
230 FILE* xml;
232 char output[3000];
235 
236 
238 {
239  long duration = elapsed(global_start_time);
240 
241  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
242  if (cur_output != output)
243  {
244  fprintf(xml, "%s", output);
245  cur_output = output;
246  }
247  fprintf(xml, "</testcase>\n");
248 }
249 
250 
251 void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
252 {
253  ++tests;
254  if (!value)
255  {
256  va_list args;
257 
258  ++failures;
259  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
260 
261  va_start(args, format);
262  vprintf(format, args);
263  va_end(args);
264 
265  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
266  description, filename, lineno);
267  }
268  else
269  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
270 }
271 
272 
274 {
275  MQTTAsync c = (MQTTAsync)context;
276 
277  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
278  test_finished = 1;
279 }
280 
281 
283 {
284  MQTTAsync c = (MQTTAsync)context;
286  int rc;
287 
288  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
290  opts.context = c;
291 
292  assert("Correct serverURI returned", strstr(response->alt.connect.serverURI, options.connection) != NULL,
293  "serverURI was %s", response->alt.connect.serverURI);
294  assert("Correct MQTTVersion returned", response->alt.connect.MQTTVersion == 4,
295  "MQTTVersion was %d", response->alt.connect.MQTTVersion);
296  assert("Correct sessionPresent returned", response->alt.connect.sessionPresent == 1,
297  "sessionPresent was %d", response->alt.connect.sessionPresent);
298 
299  rc = MQTTAsync_disconnect(c, &opts);
300  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
301 }
302 
303 
305 {
306  MQTTAsync c = (MQTTAsync)context;
308  int rc;
309 
310  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
311 
312  opts.MQTTVersion = 4;
313  opts.cleansession = 0;
314  if (options.haconnections != NULL)
315  {
318  }
320  opts.onFailure = NULL;
321  opts.context = c;
322 
323  opts.cleansession = 0;
324  rc = MQTTAsync_connect(c, &opts);
325  assert("Connect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
326 }
327 
328 
330 {
331  MQTTAsync c = (MQTTAsync)context;
333  int rc;
334 
335  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
337  opts.context = c;
338 
339  assert("Correct serverURI returned", strcmp(response->alt.connect.serverURI, options.connection) == 0,
340  "serverURI was %s", response->alt.connect.serverURI);
341  assert("Correct MQTTVersion returned", response->alt.connect.MQTTVersion == 4,
342  "MQTTVersion was %d", response->alt.connect.MQTTVersion);
343  assert("Correct sessionPresent returned", response->alt.connect.sessionPresent == 0,
344  "sessionPresent was %d", response->alt.connect.sessionPresent);
345 
346  rc = MQTTAsync_disconnect(c, &opts);
347  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
348 }
349 
350 
352 {
353  MQTTAsync c = (MQTTAsync)context;
355  int rc;
356 
357  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
358 
359  opts.MQTTVersion = 4;
360  opts.cleansession = 0;
361  if (options.haconnections != NULL)
362  {
365  }
367  opts.onFailure = NULL;
368  opts.context = c;
369 
370  opts.cleansession = 0;
371  rc = MQTTAsync_connect(c, &opts);
372  assert("Connect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
373 }
374 
375 
377 {
378  MQTTAsync c = (MQTTAsync)context;
380  int rc;
381 
382  MyLog(LOGA_DEBUG, "In connect onSuccess callback 1, context %p", context);
384  opts.context = c;
385 
386  assert("Correct serverURI returned", strcmp(response->alt.connect.serverURI, options.connection) == 0,
387  "serverURI was %s", response->alt.connect.serverURI);
388  assert("Correct MQTTVersion returned", response->alt.connect.MQTTVersion == 4,
389  "MQTTVersion was %d", response->alt.connect.MQTTVersion);
390  assert("Correct sessionPresent returned", response->alt.connect.sessionPresent == 0,
391  "sessionPresent was %d", response->alt.connect.sessionPresent);
392 
393  rc = MQTTAsync_disconnect(c, &opts);
394  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
395 }
396 
397 
398 /*********************************************************************
399 
400 Test1: sessionPresent
401 
402 *********************************************************************/
403 int test1(struct Options options)
404 {
405  MQTTAsync c;
407  int rc = 0;
408  char* test_topic = "C client test1";
409 
410  fprintf(xml, "<testcase classname=\"test1\" name=\"sessionPresent\"");
412  test_finished = failures = 0;
413  MyLog(LOGA_INFO, "Starting test 1 - sessionPresent");
414 
415  rc = MQTTAsync_create(&c, options.connection, "sesssionPresent",
417  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
418  if (rc != MQTTASYNC_SUCCESS)
419  {
420  MQTTAsync_destroy(&c);
421  goto exit;
422  }
423 
424  opts.MQTTVersion = 4;
425  if (options.haconnections != NULL)
426  {
427  opts.serverURIs = options.haconnections;
428  opts.serverURIcount = options.hacount;
429  }
431  opts.onFailure = NULL;
432  opts.context = c;
433 
434  /* Connect cleansession */
435  opts.cleansession = 1;
436  MyLog(LOGA_DEBUG, "Connecting");
437  rc = MQTTAsync_connect(c, &opts);
438  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
439  if (rc != MQTTASYNC_SUCCESS)
440  goto exit;
441 
442  while (!test_finished)
443  #if defined(_WIN32)
444  Sleep(100);
445  #else
446  usleep(10000L);
447  #endif
448 
449  MQTTAsync_destroy(&c);
450 
451 exit:
452  MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
453  (failures == 0) ? "passed" : "failed", tests, failures);
455  return failures;
456 }
457 
458 
460 {
461  MQTTAsync c = (MQTTAsync)context;
462 
463  MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
464  test_finished = 1;
465 }
466 
467 
469 {
470  MQTTAsync c = (MQTTAsync)context;
472  int rc;
473 
474  MyLog(LOGA_DEBUG, "In subscribe onFailure callback, context %p", context);
475 
476  assert("Correct subscribe return code", response->code == MQTT_BAD_SUBSCRIBE,
477  "qos was %d", response->code);
478 
480  rc = MQTTAsync_disconnect(c, &opts);
481  assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
482 }
483 
484 
486 {
487  MQTTAsync c = (MQTTAsync)context;
489  int rc;
490 
491  MyLog(LOGA_DEBUG, "In subscribe onSuccess callback, context %p", context);
492 
493  assert("Correct subscribe return code", response->alt.qos == 2,
494  "qos was %d", response->alt.qos);
495 }
496 
497 
499 {
500  MQTTAsync c = (MQTTAsync)context;
502  int rc;
503 
504  MyLog(LOGA_DEBUG, "In connect onSuccess callback 1, context %p", context);
505 
506  assert("Correct serverURI returned", strcmp(response->alt.connect.serverURI, options.connection) == 0,
507  "serverURI was %s", response->alt.connect.serverURI);
508  assert("Correct MQTTVersion returned", response->alt.connect.MQTTVersion == 4,
509  "MQTTVersion was %d", response->alt.connect.MQTTVersion);
510  assert("Correct sessionPresent returned", response->alt.connect.sessionPresent == 0,
511  "sessionPresent was %d", response->alt.connect.sessionPresent);
512 
513  MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
514  opts.context = c;
515 
517  rc = MQTTAsync_subscribe(c, "a topic I can subscribe to", 2, &opts);
518  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
519  if (rc != MQTTASYNC_SUCCESS)
520  test_finished = 1;
521 
522  opts.onSuccess = NULL;
524  rc = MQTTAsync_subscribe(c, "nosubscribe", 2, &opts);
525  assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
526  if (rc != MQTTASYNC_SUCCESS)
527  test_finished = 1;
528 }
529 
530 
531 
532 /*********************************************************************
533 
534 Test1: 0x80 from subscribe
535 
536 *********************************************************************/
537 int test2(struct Options options)
538 {
539  MQTTAsync c;
541  int rc = 0;
542  char* test_topic = "C client test1";
543 
544  fprintf(xml, "<testcase classname=\"test2\" name=\"bad subscribe\"");
546  test_finished = failures = 0;
547  MyLog(LOGA_INFO, "Starting test 2 - bad subscribe");
548 
549  rc = MQTTAsync_create(&c, options.connection, "badSubscribe test",
551  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
552  if (rc != MQTTASYNC_SUCCESS)
553  {
554  MQTTAsync_destroy(&c);
555  goto exit;
556  }
557 
558  opts.MQTTVersion = 4;
559  if (options.haconnections != NULL)
560  {
561  opts.serverURIs = options.haconnections;
562  opts.serverURIcount = options.hacount;
563  }
565  opts.onFailure = NULL;
566  opts.context = c;
567 
568  /* Connect cleansession */
569  opts.cleansession = 1;
570  MyLog(LOGA_DEBUG, "Connecting");
571  rc = MQTTAsync_connect(c, &opts);
572  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
573  if (rc != MQTTASYNC_SUCCESS)
574  goto exit;
575 
576  while (!test_finished)
577  #if defined(_WIN32)
578  Sleep(100);
579  #else
580  usleep(10000L);
581  #endif
582 
583  MQTTAsync_destroy(&c);
584 
585 exit:
586  MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
587  (failures == 0) ? "passed" : "failed", tests, failures);
589  return failures;
590 }
591 
592 
593 
594 
595 int main(int argc, char** argv)
596 {
597  int rc = 0;
598  int (*tests[])() = {NULL, test1, test2};
599  int i;
600 
601  xml = fopen("TEST-MQTT4sync.xml", "w");
602  fprintf(xml, "<testsuite name=\"test-mqtt4sync\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
603 
604  setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
605  setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1);
606 
607  getopts(argc, argv);
608 
609  for (i = 0; i < options.iterations; ++i)
610  {
611  if (options.test_no == 0)
612  { /* run all the tests */
614  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
615  }
616  else
617  rc = tests[options.test_no](options); /* run just the selected test */
618  }
619 
620  if (rc == 0)
621  MyLog(LOGA_INFO, "verdict pass");
622  else
623  MyLog(LOGA_INFO, "verdict fail");
624 
625  fprintf(xml, "</testsuite>\n");
626  fclose(xml);
627  return rc;
628 }
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
void test1_onConnect2(void *context, MQTTAsync_successData *response)
FILE * xml
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
int tests
enum MQTTPropertyCodes value
char ** haconnections
Definition: test1.c:50
void test1_onConnect3(void *context, MQTTAsync_successData *response)
struct Options options
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
union MQTTAsync_successData::@46 alt
void getopts(int argc, char **argv)
#define LOGA_DEBUG
int test2(struct Options options)
char * connection
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
#define START_TIME_TYPE
void test1_onDisconnect1(void *context, MQTTAsync_successData *response)
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
Definition: test2.py:1
struct pubsub_opts opts
Definition: paho_c_pub.c:42
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
#define malloc(x)
Definition: Heap.h:41
#define assert(a, b, c, d)
int test1(struct Options options)
std::tm localtime(std::time_t time)
Definition: chrono.h:292
struct MQTTAsync_successData::@46::@48 connect
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
static char msg_buf[512]
Definition: Log.c:122
void test2_onSubscribe2(void *context, MQTTAsync_failureData *response)
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
void test2_onConnect1(void *context, MQTTAsync_successData *response)
char * cur_output
constexpr size_t count()
Definition: core.h:960
int hacount
Definition: test1.c:52
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
#define LOGA_INFO
void write_test_result(void)
void test1_onDisconnect2(void *context, MQTTAsync_successData *response)
void MyLog(int LOGA_level, char *format,...)
description
Definition: setup.py:19
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
void usage(void)
START_TIME_TYPE global_start_time
int main(int argc, char **argv)
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
#define ARRAY_SIZE(a)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
#define MQTT_BAD_SUBSCRIBE
Definition: MQTTAsync.h:211
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void test2_onSubscribe1(void *context, MQTTAsync_successData *response)
char * test_topic
Definition: test11.c:307
MQTTClient c
Definition: test10.c:1656
char output[3000]
dictionary context
Definition: test2.py:57
void test1_onConnect1(void *context, MQTTAsync_successData *response)
int failures
START_TIME_TYPE start_clock(void)
void test1_onDisconnect3(void *context, MQTTAsync_successData *response)
long elapsed(START_TIME_TYPE start_time)
char *const * serverURIs
Definition: MQTTAsync.h:1277
enum MQTTReasonCodes rc
Definition: test10.c:1112
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
int test_finished
void test2_onDisconnect(void *context, MQTTAsync_successData *response)
int test_no
Definition: test1.c:54
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48