test_persistence.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2018 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - correct some compile warnings
16  * Ian Craggs - add binary will message test
17  * Ian Craggs - MQTT V5 updates
18  *******************************************************************************/
19 
20 
28 #include "MQTTAsync.h"
29 #include <string.h>
30 #include <stdlib.h>
31 #include "Thread.h"
32 
33 #if !defined(_WINDOWS)
34  #include <sys/time.h>
35  #include <sys/socket.h>
36  #include <unistd.h>
37  #include <errno.h>
38 #else
39  #include <windows.h>
40 #endif
41 
42 char unique[50]; // unique suffix/prefix to add to clientid/topic etc
43 
44 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
45 
46 void usage(void)
47 {
48  printf("help!!\n");
49  exit(EXIT_FAILURE);
50 }
51 
52 struct Options
53 {
54  char* connection;
55  char* proxy_connection;
56  int verbose;
57  int test_no;
58 } options =
59 {
60  "mqtt.eclipse.org:1883",
61  "localhost:1883",
62  0,
63  0,
64 };
65 
66 void getopts(int argc, char** argv)
67 {
68  int count = 1;
69 
70  while (count < argc)
71  {
72  if (strcmp(argv[count], "--test_no") == 0)
73  {
74  if (++count < argc)
75  options.test_no = atoi(argv[count]);
76  else
77  usage();
78  }
79  else if (strcmp(argv[count], "--connection") == 0)
80  {
81  if (++count < argc)
82  options.connection = argv[count];
83  else
84  usage();
85  }
86  else if (strcmp(argv[count], "--proxy_connection") == 0)
87  {
88  if (++count < argc)
90  else
91  usage();
92  }
93  else if (strcmp(argv[count], "--verbose") == 0)
94  options.verbose = 1;
95  count++;
96  }
97 }
98 
99 
100 #define LOGA_DEBUG 0
101 #define LOGA_INFO 1
102 #include <stdarg.h>
103 #include <time.h>
104 #include <sys/timeb.h>
105 void MyLog(int LOGA_level, char* format, ...)
106 {
107  static char msg_buf[256];
108  va_list args;
109  struct timeb ts;
110 
111  struct tm *timeinfo;
112 
113  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
114  return;
115 
116  ftime(&ts);
117  timeinfo = localtime(&ts.time);
118  strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
119 
120  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
121 
122  va_start(args, format);
123  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
124  format, args);
125  va_end(args);
126 
127  printf("%s\n", msg_buf);
128  fflush(stdout);
129 }
130 
131 void MySleep(long milliseconds)
132 {
133 #if defined(_WIN32) || defined(_WIN64)
134  Sleep(milliseconds);
135 #else
136  usleep(milliseconds*1000);
137 #endif
138 }
139 
140 #if defined(_WIN32) || defined(_WINDOWS)
141 #define START_TIME_TYPE DWORD
142 static DWORD start_time = 0;
144 {
145  return GetTickCount();
146 }
147 #elif defined(AIX)
148 #define START_TIME_TYPE struct timespec
150 {
151  static struct timespec start;
152  clock_gettime(CLOCK_REALTIME, &start);
153  return start;
154 }
155 #else
156 #define START_TIME_TYPE struct timeval
157 /* TODO - unused - remove? static struct timeval start_time; */
159 {
160  struct timeval start_time;
161  gettimeofday(&start_time, NULL);
162  return start_time;
163 }
164 #endif
165 
166 #if defined(_WIN32)
167 long elapsed(START_TIME_TYPE start_time)
168 {
169  return GetTickCount() - start_time;
170 }
171 #elif defined(AIX)
172 #define assert(a)
173 long elapsed(struct timespec start)
174 {
175  struct timespec now, res;
176 
177  clock_gettime(CLOCK_REALTIME, &now);
178  ntimersub(now, start, res);
179  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
180 }
181 #else
182 long elapsed(START_TIME_TYPE start_time)
183 {
184  struct timeval now, res;
185 
186  gettimeofday(&now, NULL);
187  timersub(&now, &start_time, &res);
188  return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
189 }
190 #endif
191 
192 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
193 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
194 
195 #define MAXMSGS 30;
196 
197 int tests = 0;
198 int failures = 0;
199 FILE* xml;
201 char output[3000];
203 
204 
206 {
207  long duration = elapsed(global_start_time);
208 
209  fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
210  if (cur_output != output)
211  {
212  fprintf(xml, "%s", output);
213  cur_output = output;
214  }
215  fprintf(xml, "</testcase>\n");
216 }
217 
218 void myassert(char* filename, int lineno, char* description, int value,
219  char* format, ...)
220 {
221  ++tests;
222  if (!value)
223  {
224  va_list args;
225 
226  ++failures;
227  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
228  lineno, description);
229 
230  va_start(args, format);
231  vprintf(format, args);
232  va_end(args);
233 
234  cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
235  description, filename, lineno);
236  }
237  else
238  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
239  filename, lineno, description);
240 }
241 
242 
244 {
245  int i = 0;
246 
247  for (i = 0; i < props->count; ++i)
248  {
249  int id = props->array[i].identifier;
250  const char* name = MQTTPropertyName(id);
251  char* intformat = "Property name %s value %d";
252 
253  switch (MQTTProperty_getType(id))
254  {
256  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.byte);
257  break;
259  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer2);
260  break;
262  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
263  break;
265  MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
266  break;
269  MyLog(LOGA_DEBUG, "Property name %s value len %.*s", name,
270  props->array[i].value.data.len, props->array[i].value.data.data);
271  break;
273  MyLog(LOGA_DEBUG, "Property name %s key %.*s value %.*s", name,
274  props->array[i].value.data.len, props->array[i].value.data.data,
275  props->array[i].value.value.len, props->array[i].value.value.data);
276  break;
277  }
278  }
279 }
280 
281 char willTopic[100];
282 char test_topic[50];
283 
284 /*********************************************************************
285 
286 Test7: Fill up TCP buffer with QoS 0 messages
287 
288 *********************************************************************/
294 
295 int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
296 {
297  MQTTAsync c = (MQTTAsync)context;
298  static int message_count = 0;
299 
300  MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
301 
302  if (memcmp(message->payload, "will message", message->payloadlen) == 0)
304  else
306 
307  MQTTAsync_freeMessage(&message);
308  MQTTAsync_free(topicName);
309 
310  return 1;
311 }
312 
313 void test7cConnected(void* context, char* cause)
314 {
315  MQTTAsync c = (MQTTAsync)context;
316 
317  MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
318  test7c_connected = 1;
319 }
320 
322 {
323  MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
324 
326  test7Finished = 1;
327 }
328 
330 {
331  MQTTAsync c = (MQTTAsync)context;
333 
334  MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
335 
336  /* send a message to the proxy to break the connection */
337  pubmsg.payload = "TERMINATE";
338  pubmsg.payloadlen = (int)strlen(pubmsg.payload);
339  pubmsg.qos = 0;
340  pubmsg.retained = 0;
341  //rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
342  //assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
343 }
344 
345 
346 int test7(struct Options options)
347 {
348  char* testname = "test7";
349  int subsqos = 2;
350  MQTTAsync c;
354  int rc = 0;
355  int count = 0;
356  char clientidc[50];
357  int i = 0;
358 
361  test7Finished = 0;
363  test7c_connected = 0;
364 
365  sprintf(willTopic, "paho-test95-7-%s", unique);
366  sprintf(clientidc, "paho-test9-7-c-%s", "same"); //unique);
367  sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
368 
369  test7Finished = 0;
370  failures = 0;
371  MyLog(LOGA_INFO, "Starting Offline buffering 7 - many persisted messages");
372  fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
374 
375  createOpts.MQTTVersion = MQTTVERSION_5;
376  createOpts.allowDisconnectedSendAtAnyTime = 1;
377  createOpts.sendWhileDisconnected = 1;
378  createOpts.maxBufferedMessages = 64000;
379  createOpts.persistQoS0 = 1;
380  printf("Create starting\n");
381  START_TIME_TYPE start = start_clock();
383  NULL, &createOpts);
384  long duration = elapsed(start);
385  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
386  if (rc != MQTTASYNC_SUCCESS)
387  {
388  MQTTAsync_destroy(&c);
389  goto exit;
390  }
391  printf("Create finished after %ld ms\n", duration);
392  MQTTAsync_token *tokens, *cur_token;
393  MQTTAsync_getPendingTokens(c, &tokens);
394  int token_count = 0;
395  if ((cur_token = tokens) != NULL)
396  {
397  while (*cur_token != -1)
398  {
399  cur_token++;
400  token_count++;
401  }
402  }
403  printf("%d messages restored\n", token_count);
404  if (tokens)
405  MQTTAsync_free(tokens);
406 
407  opts.keepAliveInterval = 20;
408  opts.cleansession = 1;
409 
410  rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
411  assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
412 
413 #if 0
414  opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
415  opts.context = c;
418  MyLog(LOGA_DEBUG, "Connecting client c");
419  rc = MQTTAsync_connect(c, &opts);
420  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
421  if (rc != MQTTASYNC_SUCCESS)
422  {
423  failures++;
424  goto exit;
425  }
426 
427  /* wait until d is ready: connected and subscribed */
428  count = 0;
429  while (!test7cReady && ++count < 10000)
430  {
431  if (test7Finished)
432  goto exit;
433  MySleep(100);
434  }
435  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
436 #endif
437 
439  assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
440 
441  /* let client c go: connect, and send disconnect command to proxy */
442  opts.will = &wopts;
443  opts.will->payload.data = "will message";
444  opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
445  opts.will->qos = 1;
446  opts.will->retained = 0;
447  opts.will->topicName = willTopic;
450  opts.context = c;
451  opts.cleansession = 0;
452  /*opts.automaticReconnect = 1;
453  opts.minRetryInterval = 3;
454  opts.maxRetryInterval = 6;*/
455 
456 #if 0
457  MyLog(LOGA_DEBUG, "Connecting client c");
458  rc = MQTTAsync_connect(c, &opts);
459  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
460  if (rc != MQTTASYNC_SUCCESS)
461  {
462  failures++;
463  goto exit;
464  }
465 
466  count = 0;
467  while (!test7c_connected && ++count < 10000)
468  MySleep(100);
469  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
470 #endif
471 
472  /* wait for will message */
473  //while (test7_will_message_received == 0 && ++count < 10000)
474  // MySleep(100);
475 
476  MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
477 
478  test7c_connected = 0;
479 #define PAYLOAD_LEN 500
480  char buf[PAYLOAD_LEN];
481  /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
482  for (i = 0; i < 50000; ++i)
483  {
486  pubmsg.qos = i % 3;
487  sprintf(buf, "QoS %d message", pubmsg.qos);
488  pubmsg.payload = buf;
489  pubmsg.payloadlen = PAYLOAD_LEN;
490  pubmsg.retained = 0;
491  rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
492  assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
493  if (rc != MQTTASYNC_SUCCESS)
494  {
495  MySleep(3000);
496  MyLog(LOGA_DEBUG, "Connecting client c");
497  rc = MQTTAsync_connect(c, &opts);
498  assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
499  if (rc != MQTTASYNC_SUCCESS)
500  {
501  failures++;
502  goto exit;
503  }
504 
505  count = 0;
506  while (!test7c_connected && ++count < 10000)
507  MySleep(100);
508  assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
509  MySleep(3000);
510  break;
511  }
512  }
513 
514 exit:
515  rc = MQTTAsync_disconnect(c, NULL);
516  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
517 
518  /*rc = MQTTAsync_disconnect(d, NULL);
519  assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);*/
520 
521  MySleep(200);
522  MQTTAsync_destroy(&c);
523  MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
524  (failures == 0) ? "passed" : "failed", testname, tests, failures);
526  return failures;
527 }
528 
529 
530 
531 void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
532 {
533  printf("%s\n", message);
534 }
535 
536 
537 int main(int argc, char** argv)
538 {
539  int* numtests = &tests;
540  int rc = 0;
541  int (*tests[])() = { NULL, test7 };
542  time_t randtime;
543 
544  srand((unsigned) time(&randtime));
545  sprintf(unique, "%u", rand());
546  MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
547 
548  xml = fopen("TEST-test9.xml", "w");
549  fprintf(xml, "<testsuite name=\"test9\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
550 
552  getopts(argc, argv);
553 
554  if (options.test_no == 0)
555  { /* run all the tests */
557  {
558  failures = 0;
560  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
561  }
562  }
563  else
564  {
566  rc = tests[options.test_no](options); /* run just the selected test */
567  }
568 
569  MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
570  if (rc == 0)
571  MyLog(LOGA_INFO, "verdict pass");
572  else
573  MyLog(LOGA_INFO, "verdict fail");
574 
575  fprintf(xml, "</testsuite>\n");
576  fclose(xml);
577 
578  return rc;
579 }
580 
int test7Finished
enum MQTTPropertyCodes value
int test7(struct Options options)
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
void test7cOnConnectSuccess(void *context, MQTTAsync_successData5 *response)
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
char * cur_output
MQTTProperties props
Definition: paho_c_pub.c:54
char * proxy_connection
Definition: test1.c:51
#define assert(a, b, c, d)
MQTTLenString value
char * connection
const char * topicName
Definition: MQTTAsync.h:994
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
int test7OnFailureCalled
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
#define PAYLOAD_LEN
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
void usage(void)
struct pubsub_opts opts
Definition: paho_c_pub.c:42
char unique[50]
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
#define START_TIME_TYPE
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
int MQTTProperty_getType(enum MQTTPropertyCodes value)
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
int MQTTAsync_setConnected(MQTTAsync handle, void *context, MQTTAsync_connected *connected)
Definition: MQTTAsync.c:3178
int test7c_connected
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char *message)
std::tm localtime(std::time_t time)
Definition: chrono.h:292
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
FILE * xml
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
int test7_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
static char msg_buf[512]
Definition: Log.c:122
void MySleep(long milliseconds)
int test7_messages_received
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
#define MQTTAsync_connectOptions_initializer5
Definition: MQTTAsync.h:1338
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
Definition: MQTTAsync.c:4737
void MyLog(int LOGA_level, char *format,...)
void getopts(int argc, char **argv)
const char * MQTTPropertyName(enum MQTTPropertyCodes value)
description
Definition: setup.py:19
int message_count
Definition: test5.c:72
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1327
START_TIME_TYPE start_clock(void)
enum MQTTPropertyCodes identifier
char test_topic[50]
struct MQTTAsync_willOptions::@54 payload
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
#define MQTTAsync_createOptions_initializer
Definition: MQTTAsync.h:965
int tests
#define ARRAY_SIZE(a)
int MQTTAsync_token
Definition: MQTTAsync.h:249
void write_test_result(void)
long elapsed(START_TIME_TYPE start_time)
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
#define MQTTCLIENT_PERSISTENCE_DEFAULT
const char * name
MQTTProperty * array
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
void test7cConnected(void *context, char *cause)
START_TIME_TYPE global_start_time
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
MQTTClient c
Definition: test10.c:1656
float time
Definition: mqtt_test.py:17
dictionary context
Definition: test2.py:57
void test7cOnConnectFailure(void *context, MQTTAsync_failureData5 *response)
#define LOGA_DEBUG
#define LOGA_INFO
int test7_will_message_received
int failures
struct Options options
const void * data
Definition: MQTTAsync.h:1010
void logProperties(MQTTProperties *props)
char output[3000]
char willTopic[100]
int main(int argc, char **argv)
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
enum MQTTReasonCodes rc
Definition: test10.c:1112
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1321
int test_no
Definition: test1.c:54


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48