test_issue373.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2017 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *******************************************************************************/
15 
22 #include "MQTTAsync.h"
23 #include <string.h>
24 #include <stdlib.h>
25 #include "Thread.h"
26 
27 #if !defined(_WINDOWS)
28 #include <sys/time.h>
29 #include <sys/socket.h>
30 #include <unistd.h>
31 #include <errno.h>
32 #else
33 #include <windows.h>
34 #endif
35 #include "Heap.h" // for Heap_get_info
36 // undefine macros from Heap.h:
37 #undef malloc
38 #undef realloc
39 #undef free
40 
41 char unique[50]; // unique suffix/prefix to add to clientid/topic etc
42 
43 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
44 
45 void usage(void)
46 {
47  printf("help!!\n");
48  exit(EXIT_FAILURE);
49 }
50 
51 struct Options
52 {
53  char* connection;
54  char* proxy_connection;
55  int verbose;
56  int test_no;
57  unsigned int QoS;
58  unsigned int iterrations;
59 } options =
60 {
61  "localhost:1883",
62  "localhost:1884",
63  0,
64  0,
65  0,
66  5
67 };
68 
69 void getopts(int argc, char** argv)
70 {
71  int count = 1;
72 
73  while (count < argc)
74  {
75  if (strcmp(argv[count], "--test_no") == 0)
76  {
77  if (++count < argc)
78  options.test_no = atoi(argv[count]);
79  else
80  usage();
81  }
82  else if (strcmp(argv[count], "--connection") == 0)
83  {
84  if (++count < argc)
85  options.connection = argv[count];
86  else
87  usage();
88  }
89  else if (strcmp(argv[count], "--proxy_connection") == 0)
90  {
91  if (++count < argc)
93  else
94  usage();
95  }
96  else if (strcmp(argv[count], "--verbose") == 0)
97  options.verbose = 1;
98  count++;
99  }
100 }
101 
102 
103 #define LOGA_DEBUG 0
104 #define LOGA_INFO 1
105 #include <stdarg.h>
106 #include <time.h>
107 #include <sys/timeb.h>
108 void MyLog(int LOGA_level, char* format, ...)
109 {
110  static char msg_buf[256];
111  va_list args;
112 #if defined(_WIN32) || defined(_WINDOWS)
113  struct timeb ts;
114 #else
115  struct timeval ts;
116 #endif
117  struct tm *timeinfo;
118 
119  if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
120  return;
121 
122 #if defined(_WIN32) || defined(_WINDOWS)
123  ftime(&ts);
124  timeinfo = localtime(&ts.time);
125 #else
126  gettimeofday(&ts, NULL);
127  timeinfo = localtime(&ts.tv_sec);
128 #endif
129  strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
130 
131 #if defined(_WIN32) || defined(_WINDOWS)
132  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
133 #else
134  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
135 #endif
136 
137  va_start(args, format);
138  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
139  va_end(args);
140 
141  printf("%s\n", msg_buf);
142  fflush(stdout);
143 }
144 
145 void MySleep(long milliseconds)
146 {
147 #if defined(_WIN32) || defined(_WIN64)
148  Sleep(milliseconds);
149 #else
150  usleep(milliseconds*1000);
151 #endif
152 }
153 
154 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
155 
156 int tests = 0;
157 int failures = 0;
158 int connected = 0;
159 int pendingMessageCnt = 0; /* counter of messages which are currently queued for publish */
163 int connectCnt = 0;
164 int connecting = 0;
165 
166 void myassert(char* filename, int lineno, char* description, int value,
167  char* format, ...)
168 {
169  ++tests;
170  if (!value)
171  {
172  va_list args;
173 
174  ++failures;
175  MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
176  lineno, description);
177 
178  va_start(args, format);
179  vprintf(format, args);
180  va_end(args);
181  }
182  else
183  MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
184  filename, lineno, description);
185 }
186 
187 
189 {
190  MyLog(LOGA_INFO, "In connect onFailure callback, context %p", context);
191  connecting = 0;
192 }
193 
195 {
196  connected = 1;
197  connecting = 0;
198  connectCnt++;
199  MyLog(LOGA_INFO, "Established MQTT connection to %s",response->alt.connect.serverURI);
200  char MqttVersion[40];
201  switch (response->alt.connect.MQTTVersion)
202  {
203  case MQTTVERSION_3_1:
204  sprintf(MqttVersion," MQTT version 3.1");
205  break;
206  case MQTTVERSION_3_1_1:
207  sprintf(MqttVersion, " MQTT version 3.1.1");
208  break;
209  default:
210  sprintf(MqttVersion, " MQTT version %d",response->alt.connect.MQTTVersion);
211  }
212  MyLog(LOGA_INFO, " %s\n",MqttVersion);
213  MyLog(LOGA_INFO, "connectCnt %d\n",connectCnt);
214 }
215 
216 void test373ConnectionLost(void* context, char* cause)
217 {
218  connected = 0;
219  MyLog(LOGA_INFO, "Disconnected from MQTT broker reason %s",cause);
220 }
221 
223 {
224 }
225 
227 {
229  goodPublishCnt++;
230 }
231 
233 {
236 }
237 
238 int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
239 {
240  return 0;
241 }
242 
243 static char test373Payload[] = "No one is interested in this payload";
244 
245 int test373SendPublishMessage(MQTTAsync handle,int id, const unsigned int QoS)
246 {
247  int rc = 0;
250  char topic[ sizeof(unique) + 40];
251 
252  sprintf(topic,"%s/test373/item_%03d",unique,id);
255 
256  pubmsg.payload = test373Payload;
257  pubmsg.payloadlen = sizeof(test373Payload);
258  pubmsg.qos = QoS;
259  rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
260  if (rc == MQTTASYNC_SUCCESS)
261  {
264  }
265  else
266  {
267  MyLog(LOGA_INFO, "Failed to queue message for send with retvalue %d",rc);
268  }
269  return rc;
270 }
271 
273 {
274  char* testname = "test373";
275  MQTTAsync mqttasyncContext;
278  int rc = 0;
279  char clientid[30 + sizeof(unique)];
280  heap_info* mqtt_mem = 0;
281 
282  MyLog(LOGA_INFO, "Running test373 with QoS=%u, iterrations=%u\n",options.QoS,options.iterrations);
283  sprintf(clientid, "paho-test373-%s", unique);
284  connectCnt = 0;
285  rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
287  NULL);
288  assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
289  if (rc != MQTTASYNC_SUCCESS)
290  {
291  goto exit;
292  }
293  opts.connectTimeout = 2;
294  opts.keepAliveInterval = 20;
295  opts.cleansession = 0;
299  opts.context = mqttasyncContext;
300 
301  rc = MQTTAsync_setCallbacks(mqttasyncContext,mqttasyncContext,
305  if (rc != MQTTASYNC_SUCCESS)
306  {
307  goto exit;
308  }
310  while (connectCnt < (int)options.iterrations)
311  {
312  if (!connected)
313  {
314  MyLog(LOGA_INFO, "Connected %d connectCnt %d\n",connected,connectCnt);
315  MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
317 #if !defined(_WINDOWS)
318  mqtt_mem = Heap_get_info();
319  MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
320 #endif
321  /* (re)connect to the broker */
322  if (connecting)
323  {
324  MySleep((1+opts.connectTimeout) * 1000); /* but wait for all pending connect attempts to timeout */
325  }
326  else
327  {
328  rc = MQTTAsync_connect(mqttasyncContext, &opts);
329  if (rc != MQTTASYNC_SUCCESS)
330  {
331  failures++;
332  goto exit;
333  }
334  connecting = 1;
335  }
336  }
337  else
338  {
339  /* while connected send 100 message per second */
340  int topicId;
341  for(topicId=0; topicId < 100; topicId++)
342  {
343  rc = test373SendPublishMessage(mqttasyncContext,topicId,options.QoS);
344  if (rc != MQTTASYNC_SUCCESS) break;
345  }
346  MySleep(100);
347  }
348  }
349  MySleep(5000);
350  MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
352 #if !defined(_WINDOWS)
353  mqtt_mem = Heap_get_info();
354  MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
355 #endif
356  MQTTAsync_disconnect(mqttasyncContext, NULL);
357  connected = 0;
358  MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
360 #if !defined(_WINDOWS)
361  mqtt_mem = Heap_get_info();
362  MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
363 #endif
364 exit:
365  MQTTAsync_destroy(&mqttasyncContext);
366 #if !defined(_WINDOWS)
367  mqtt_mem = Heap_get_info();
368  MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
369  if (mqtt_mem->current_size > 0) failures++; /* consider any not freed memory as failure */
370 #endif
371  return failures;
372 }
373 
374 void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
375 {
376  printf("%s\n", message);
377 }
378 
379 int main(int argc, char** argv)
380 {
381  int* numtests = &tests;
382  int rc = 0;
383  int (*tests[])() = { NULL, test_373};
384  unsigned int QoS;
385 
386  sprintf(unique, "%u", rand());
387  MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
388 
390  getopts(argc, argv);
391 
392  if (options.test_no == 0)
393  { /* run all the tests */
395  {
396  /* test with QoS 0, 1 and 2 and just 5 iterrations */
397  for (QoS = 0; QoS < 3; QoS++)
398  {
399  failures = 0;
400  options.QoS = QoS;
401  options.iterrations = 5;
403  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
404  }
405  if (rc == 0)
406  {
407  /* Test with much more iterrations for QoS = 0 */
408  failures = 0;
409  options.QoS = 0;
410  options.iterrations = 100;
412  rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
413  }
414  }
415  }
416  else
417  {
419  rc = tests[options.test_no](options); /* run just the selected test */
420  }
421 
422  if (rc == 0)
423  MyLog(LOGA_INFO, "verdict pass");
424  else
425  MyLog(LOGA_INFO, "verdict fail");
426 
427  return rc;
428 }
429 
430 
431 /* Local Variables: */
432 /* indent-tabs-mode: t */
433 /* c-basic-offset: 8 */
434 /* End: */
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
#define ARRAY_SIZE(a)
Definition: test_issue373.c:43
void test373DeliveryComplete(void *context, MQTTAsync_token token)
enum MQTTPropertyCodes value
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
string topic
Definition: test2.py:8
int test373_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
int tests
char * proxy_connection
Definition: test1.c:51
union MQTTAsync_successData::@46 alt
char * connection
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
void test373_onWriteSuccess(void *context, MQTTAsync_successData *response)
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
struct pubsub_opts opts
Definition: paho_c_pub.c:42
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char *message)
struct Options options
#define LOGA_DEBUG
std::tm localtime(std::time_t time)
Definition: chrono.h:292
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
struct MQTTAsync_successData::@46::@48 connect
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
void getopts(int argc, char **argv)
Definition: test_issue373.c:69
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
static char msg_buf[512]
Definition: Log.c:122
int failures
void MySleep(long milliseconds)
void MyLog(int LOGA_level, char *format,...)
unsigned int iterrations
Definition: test_issue373.c:58
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
int test_373(struct Options options)
int connecting
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
void test373ConnectionLost(void *context, char *cause)
int pendingMessageCnt
char unique[50]
Definition: test_issue373.c:41
int test373SendPublishMessage(MQTTAsync handle, int id, const unsigned int QoS)
description
Definition: setup.py:19
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
heap_info * Heap_get_info(void)
Definition: Heap.c:432
int goodPublishCnt
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
int MQTTAsync_token
Definition: MQTTAsync.h:249
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
Definition: Heap.h:62
#define assert(a, b, c, d)
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
void test373OnConnect(void *context, MQTTAsync_successData *response)
void test373_onWriteFailure(void *context, MQTTAsync_failureData *response)
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
dictionary context
Definition: test2.py:57
char * clientid
Definition: test6.c:54
int connected
#define MQTTCLIENT_PERSISTENCE_NONE
static char test373Payload[]
int main(int argc, char **argv)
void test1373OnFailure(void *context, MQTTAsync_failureData *response)
unsigned int QoS
Definition: test_issue373.c:57
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
#define MQTTVERSION_3_1
Definition: MQTTAsync.h:199
void usage(void)
Definition: test_issue373.c:45
enum MQTTReasonCodes rc
Definition: test10.c:1112
int connectCnt
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
int pendingMessageCntMax
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
int failedPublishCnt
int test_no
Definition: test1.c:54
#define LOGA_INFO
void myassert(char *filename, int lineno, char *description, int value, char *format,...)


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48