test6.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2011, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  *******************************************************************************/
16 
17 
23 #include "MQTTAsync.h"
24 /*#define NO_HEAP_TRACKING 1
25 #include "Heap.h"*/
26 #include <string.h>
27 #include <stdlib.h>
28 
29 #if !defined(_WINDOWS)
30  #include <sys/time.h>
31  #include <unistd.h>
32  #include <signal.h>
33 #else
34  #include <windows.h>
35 #endif
36 
37 void usage(void)
38 {
39  printf("help!!\n");
40  exit(EXIT_FAILURE);
41 }
42 
43 static char pub_topic[200];
44 static char sub_topic[200];
45 
46 struct
47 {
48  char* connection;
49  char** connections;
52  char* topic;
54  char* clientid;
55  int slot_no;
56  int qos;
57  int retained;
58  char* username;
59  char* password;
60  int verbose;
62 } opts =
63 {
64  "tcp://localhost:1884",
65  NULL,
66  0,
67  "tcp://localhost:7777",
68  "Eclipse/Paho/restart_test",
69  "Eclipse/Paho/restart_test/control",
70  "C_broken_client",
71  1,
72  2,
73  0,
74  NULL,
75  NULL,
76  0,
77  0,
78 };
79 
80 void getopts(int argc, char** argv)
81 {
82  int count = 1;
83 
84  while (count < argc)
85  {
86  if (strcmp(argv[count], "--qos") == 0)
87  {
88  if (++count < argc)
89  {
90  if (strcmp(argv[count], "0") == 0)
91  opts.qos = 0;
92  else if (strcmp(argv[count], "1") == 0)
93  opts.qos = 1;
94  else if (strcmp(argv[count], "2") == 0)
95  opts.qos = 2;
96  else
97  usage();
98  }
99  else
100  usage();
101  }
102  else if (strcmp(argv[count], "--slot_no") == 0)
103  {
104  if (++count < argc)
105  opts.slot_no = atoi(argv[count]);
106  else
107  usage();
108  }
109  else if (strcmp(argv[count], "--connection") == 0)
110  {
111  if (++count < argc)
112  opts.connection = argv[count];
113  else
114  usage();
115  }
116  else if (strcmp(argv[count], "--connections") == 0)
117  {
118  if (++count < argc)
119  {
120  opts.connection_count = 0;
121  opts.connections = malloc(sizeof(char*) * 5);
122  char* tok = strtok(argv[count], " ");
123  while (tok)
124  {
125  opts.connections[opts.connection_count] = malloc(strlen(tok)+1);
126  strcpy(opts.connections[opts.connection_count], tok);
127  opts.connection_count++;
128  tok = strtok(NULL, " ");
129  }
130  }
131  else
132  usage();
133  }
134  else if (strcmp(argv[count], "--control_connection") == 0)
135  {
136  if (++count < argc)
137  opts.control_connection = argv[count];
138  else
139  usage();
140  }
141  else if (strcmp(argv[count], "--clientid") == 0)
142  {
143  if (++count < argc)
144  opts.clientid = argv[count];
145  else
146  usage();
147  }
148  else if (strcmp(argv[count], "--username") == 0)
149  {
150  if (++count < argc)
151  opts.username = argv[count];
152  else
153  usage();
154  }
155  else if (strcmp(argv[count], "--password") == 0)
156  {
157  if (++count < argc)
158  opts.password = argv[count];
159  else
160  usage();
161  }
162  else if (strcmp(argv[count], "--persistent") == 0)
163  opts.persistence = 1;
164  else if (strcmp(argv[count], "--verbose") == 0)
165  opts.verbose = 1;
166  count++;
167  }
168 }
169 
170 #define LOGA_DEBUG 0
171 #define LOGA_ALWAYS 1
172 #define LOGA_INFO 2
173 #include <stdarg.h>
174 #include <time.h>
175 #include <sys/timeb.h>
176 void MyLog(int LOGA_level, char* format, ...)
177 {
178  static char msg_buf[256];
179  va_list args;
180 #if defined(_WIN32) || defined(_WINDOWS)
181  struct timeb ts;
182 #else
183  struct timeval ts;
184 #endif
185  struct tm timeinfo;
186 
187  if (LOGA_level == LOGA_DEBUG && opts.verbose == 0)
188  return;
189 
190 #if defined(_WIN32) || defined(_WINDOWS)
191  ftime(&ts);
192  localtime_s(&timeinfo, &ts.time);
193 #else
194  gettimeofday(&ts, NULL);
195  localtime_r(&ts.tv_sec, &timeinfo);
196 #endif
197  strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
198 
199 #if defined(_WIN32) || defined(_WINDOWS)
200  sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
201 #else
202  sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
203 #endif
204 
205  va_start(args, format);
206  vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
207  va_end(args);
208 
209  printf("%s\n", msg_buf);
210  fflush(stdout);
211 }
212 
213 void MySleep(long milliseconds)
214 {
215 #if defined(_WIN32) || defined(_WIN64)
216  Sleep(milliseconds);
217 #else
218  usleep(milliseconds*1000);
219 #endif
220 }
221 
222 #if defined(_WIN32) || defined(_WINDOWS)
223 #define START_TIME_TYPE DWORD
224 static DWORD start_time = 0;
226 {
227  return GetTickCount();
228 }
229 #elif defined(AIX)
230 #define START_TIME_TYPE struct timespec
232 {
233  static struct timespec start;
234  clock_gettime(CLOCK_REALTIME, &start);
235  return start;
236 }
237 #else
238 #define START_TIME_TYPE struct timeval
239 /* TODO - unused - remove? static struct timeval start_time; */
241 {
242  struct timeval start_time;
243  gettimeofday(&start_time, NULL);
244  return start_time;
245 }
246 #endif
247 
248 #if defined(_WIN32)
249 long elapsed(START_TIME_TYPE start_time)
250 {
251  return GetTickCount() - start_time;
252 }
253 #elif defined(AIX)
254 #define assert(a)
255 long elapsed(struct timespec start)
256 {
257  struct timespec now, res;
258 
259  clock_gettime(CLOCK_REALTIME, &now);
260  ntimersub(now, start, res);
261  return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
262 }
263 #else
264 long elapsed(START_TIME_TYPE start_time)
265 {
266  struct timeval now, res;
267 
268  gettimeofday(&now, NULL);
269  timersub(&now, &start_time, &res);
270  return (res.tv_sec)*1000 + (res.tv_usec)/1000;
271 }
272 #endif
273 
277 int arrivedCount = 0;
279 int measuring = 0;
280 long roundtrip_time = 0L;
281 int errors = 0;
282 int stopping = 0;
283 int connection_lost = 0; /* for use with the persistence option */
284 int recreated = 0;
286 
287 char* wait_message = NULL;
288 char* wait_message2 = NULL;
291 int test_count = 1000;
292 
293 void control_connectionLost(void* context, char* cause)
294 {
295  MyLog(LOGA_ALWAYS, "Control connection lost - stopping");
296 
297  stopping = 1;
298 }
299 
308 int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
309 {
310  MyLog(LOGA_ALWAYS, "Control message arrived: %.*s wait message: %s",
311  m->payloadlen, m->payload, (wait_message == NULL) ? "None" : wait_message);
312  if (strncmp(m->payload, "stop", 4) == 0)
313  {
314  MyLog(LOGA_ALWAYS, "Stop message arrived, stopping...");
315  stopping = 1;
316  }
317  else if (wait_message != NULL && strncmp(wait_message, m->payload,
318  strlen(wait_message)) == 0)
319  {
320  MyLog(LOGA_ALWAYS, "Wait message %s found", wait_message);
321  control_found = 1;
322  wait_message = NULL;
323  }
324  else if (wait_message2 != NULL && strncmp(wait_message2, m->payload,
325  strlen(wait_message2)) == 0)
326  {
327  MyLog(LOGA_ALWAYS, "Wait message2 %s found", wait_message);
328  control_found = 2;
329  wait_message2 = NULL;
330  }
331 
332  MQTTAsync_free(topicName);
334  return 1;
335 }
336 
337 
338 int control_send(char* message)
339 {
340  char buf[156];
341  int rc = 0;
343 
344  sprintf(buf, "%s: %s", opts.clientid, message);
345  MyLog(LOGA_ALWAYS, "Sending control message: %s", message);
346  rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf),
347  buf, 1, 0, &ropts);
348  MyLog(LOGA_DEBUG, "Control message sent: %s", buf);
349 
350  return rc;
351 }
352 
353 
354 /* wait for a specific message on the control topic. */
355 int control_wait(char* message)
356 {
357  int count = 0;
358  char buf[120];
359 
360  control_found = 0;
361  wait_message = message;
362 
363  sprintf(buf, "waiting for: %s", message);
364  control_send(buf);
365 
366  MyLog(LOGA_ALWAYS, "Waiting for: %s", message);
367  while (control_found == 0 && stopping == 0)
368  {
369  if (++count == 300)
370  {
371  stopping = 1;
372  MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message);
373  return 0; /* time out and tell the caller the message was not found */
374  }
375  MySleep(1000);
376  }
377  MyLog(LOGA_ALWAYS, "Control message found: %s, control_found %d", message, control_found);
378  return control_found;
379 }
380 
381 
382 /* wait for a specific message on the control topic. */
383 int control_which(char* message1, char* message2)
384 {
385  int count = 0;
386  control_found = 0;
387  wait_message = message1;
388  wait_message2 = message2;
389 
390  while (control_found == 0)
391  {
392  if (++count == 300)
393  break; /* time out and tell the caller the message was not found */
394  MySleep(1000);
395  }
396  return control_found;
397 }
398 
399 
401 
402 int messageArrived(void* context, char* topicName, int topicLen,
404 {
405  int seqno = -1;
406  char* token = NULL;
407 
408  token = strtok(m->payload, " ");
409  token = strtok(NULL, " ");
410  token = strtok(NULL, " ");
411 
412  if (token)
413  seqno = atoi(token);
414  if (m->qos != opts.qos)
415  {
416  MyLog(LOGA_ALWAYS, "Error, expecting QoS %d but got %d", opts.qos,
417  m->qos);
418  errors++;
419  } else if (seqno != arrivedCount + 1)
420  {
421  if (m->qos == 2 || (m->qos == 1 && seqno > arrivedCount + 1))
422  {
423  if (seqno == -1)
425  "Error, expecting sequence number %d but got message id %d, payload was %.*s",
426  arrivedCount + 1, m->msgid, m->payloadlen, m->payload);
427  else
429  "Error, expecting sequence number %d but got %d message id %d",
430  arrivedCount + 1, seqno, m->msgid);
431  errors++;
432  }
433  }
434  arrivedCount++;
435  MQTTAsync_free(topicName);
437 
440  return 1;
441 }
442 
443 
445 {
446  MQTTAsync c = (MQTTAsync)context;
447 
448  MyLog(LOGA_ALWAYS, "Successfully reconnected");
449 }
450 
451 
453 {
454  MQTTAsync c = (MQTTAsync)context;
455  int rc;
456 
457  MyLog(LOGA_ALWAYS, "Failed to reconnect with return code %d", (response) ? response->code : -9999);
458 
459  conn_opts.context = context;
460  conn_opts.keepAliveInterval = 10;
461  conn_opts.username = opts.username;
462  conn_opts.password = opts.password;
463  conn_opts.cleansession = 0;
464  conn_opts.onSuccess = client_onReconnect;
466  rc = MQTTAsync_connect(c, &conn_opts);
467  if (rc != MQTTASYNC_SUCCESS)
468  {
469  MyLog(LOGA_ALWAYS, "Failed to start reconnect with return code %d", rc);
470  stopping = 1;
471  }
472 }
473 
474 
475 void connectionLost(void* context, char* cause)
476 {
477  MQTTAsync c = (MQTTAsync)context;
478  int rc = 0;
479 
480  MyLog(LOGA_ALWAYS, "Connection lost when %d messages arrived out of %d expected",
482  //dotrace = 1;
483 
484  if (opts.persistence)
485  connection_lost = 1;
486  else
487  {
488  conn_opts.context = context;
489  conn_opts.keepAliveInterval = 10;
490  conn_opts.username = opts.username;
491  conn_opts.password = opts.password;
492  conn_opts.cleansession = 0;
493  conn_opts.onSuccess = client_onReconnect;
495  if (opts.connections)
496  {
497  conn_opts.serverURIcount = opts.connection_count;
498  conn_opts.serverURIs = opts.connections;
499  }
500  else
501  {
502  conn_opts.serverURIcount = 0;
503  conn_opts.serverURIs = NULL;
504  }
505  //printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
506  MyLog(LOGA_ALWAYS, "Starting reconnect attempt");
507  rc = MQTTAsync_connect(context, &conn_opts);
508  if (rc != MQTTASYNC_SUCCESS)
509  {
510  MyLog(LOGA_ALWAYS, "Failed to start reconnect with return code %d", rc);
511  stopping = 1;
512  }
513  }
514 }
515 
516 
518 {
519  int rc;
520 
521  if (recreated == 0)
522  {
523  MyLog(LOGA_ALWAYS, "Recreating client");
524 
525  MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */
526 #if !defined(_WINDOWS)
527  /*heap_info* mqtt_mem = 0;
528  mqtt_mem = Heap_get_info();
529  MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
530  if (mqtt_mem->current_size > 20)
531  HeapScan(5); */
532 #endif
533  rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
534  if (rc != MQTTASYNC_SUCCESS)
535  {
536  MyLog(LOGA_ALWAYS, "MQTTAsync_create failed, rc %d", rc);
537  goto exit;
538  }
539 
542  {
543  MyLog(LOGA_ALWAYS, "MQTTAsync_setCallbacks failed, rc %d", rc);
544  goto exit;
545  }
546  recreated = 1;
547  }
548 
549  MyLog(LOGA_ALWAYS, "Reconnecting client");
550  conn_opts.keepAliveInterval = 10;
551  conn_opts.username = opts.username;
552  conn_opts.password = opts.password;
553  conn_opts.cleansession = 0;
554  conn_opts.context = client;
555  conn_opts.onSuccess = client_onReconnect;
557  if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
558  MyLog(LOGA_ALWAYS, "MQTTAsync_connect failed, rc %d", rc);
559  else
560  connection_lost = 0;
561 
562 exit:
563  return rc;
564 }
565 
566 
567 int success(int count)
568 {
569  int rc = 1;
570 
571  if (errors)
572  {
573  MyLog(LOGA_ALWAYS, "Workload test failed because the callback had errors");
574  rc = 0;
575  }
576  if (arrivedCount != count)
577  {
578  if (opts.qos == 2 || (opts.qos == 1 && arrivedCount < count))
579  {
581  "Workload test failed because the wrong number of messages"
582  " was received: %d whereas %d were expected",
583  arrivedCount, count);
584  rc = 0;
585  }
586  }
587  if (rc == 1)
588  control_send("verdict: pass");
589  else
590  control_send("verdict: fail");
591  return rc;
592 }
593 
594 
596 {
597  int lastreport = 0;
598  int wait_count = 0;
599  int limit = 120;
600 
601  MyLog(LOGA_ALWAYS, "Wait for completion");
602  if (opts.qos == 0)
603  limit = 30; /* we aren't going to get back all QoS 0 messages anyway */
604  MySleep(1000);
605  while (arrivedCount < expectedCount)
606  {
607  if (arrivedCount > lastreport)
608  {
609  MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds",
610  arrivedCount, expectedCount, elapsed(start_time) / 1000);
611  lastreport = arrivedCount;
612  }
613  MySleep(1000);
614  if (opts.persistence && connection_lost)
616  if (++wait_count > limit || stopping)
617  break;
618  }
619  last_completion_time = elapsed(start_time) / 1000;
620  if (opts.qos > 0)
621  {
622  MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive");
623  MySleep(10000); /* check if any duplicate messages arrive */
624  }
625  MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds",
626  arrivedCount, expectedCount, elapsed(start_time) / 1000);
627  return success(expectedCount);
628 }
629 
630 int messagesSent = 0;
631 
633 {
634  messagesSent++;
635 }
636 
637 
638 void one_iteration(void)
639 {
640  int interval = 0;
641  int i = 0;
642  int seqno = 0;
643  int rc = 0;
644  START_TIME_TYPE start_time;
645  int last_expected_count = expectedCount;
646  int test_interval = 30;
647 
648  if (control_wait("start_measuring") == 0)
649  goto exit;
650 
651  connection_lost = 0;
652  recreated = 0;
653 
654  /* find the time for evaluation_count round-trip messages */
655  MyLog(LOGA_INFO, "Evaluating how many messages needed");
657  measuring = 1;
659  for (i = 1; i <= test_count; ++i)
660  {
661  char payload[128];
662 
663  sprintf(payload, "message number %d", i);
664 
665  rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
666  opts.qos, opts.retained, NULL);
667  while (rc != MQTTASYNC_SUCCESS)
668  {
669  if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
671  if (stopping)
672  goto exit;
673  MySleep(1000);
674  rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
675  opts.qos, opts.retained, NULL);
676  while (seqno - messagesSent > 2000)
677  MySleep(1000);
678  }
679  }
680  MyLog(LOGA_INFO, "Messages sent... waiting for echoes");
681  while (arrivedCount < test_count)
682  {
683  if (opts.persistence && connection_lost)
685  if (stopping)
686  goto exit;
687  MySleep(1000);
688  MyLog(LOGA_ALWAYS, "arrivedCount %d", arrivedCount);
689  }
690  measuring = 0;
691 
692  /* Now set a target of 30 seconds total round trip */
693  if (1) //last_completion_time == -1)
694  {
695  MyLog(LOGA_ALWAYS, "Round trip time for %d messages is %d ms", test_count, roundtrip_time);
696  // test_count messages in 3039 ms: (test_interval * 1000) / roundtrip_time * test_count
697  //expectedCount = 1000 * test_count * test_interval / roundtrip_time / 2;
698  expectedCount = (test_interval * 1000) / roundtrip_time * test_count;
699  }
700  else
701  {
702  MyLog(LOGA_ALWAYS, "Last time, %d messages took %d s.", last_expected_count, last_completion_time);
703  expectedCount = last_expected_count * test_interval / last_completion_time;
704  }
705  MyLog(LOGA_ALWAYS, "Therefore %d messages needed for 30 seconds", expectedCount);
706 
707  if (control_wait("start_test") == 0) /* now synchronize the test interval */
708  goto exit;
709 
710  MyLog(LOGA_ALWAYS, "Starting 30 second test run with %d messages", expectedCount);
711  arrivedCount = 0;
712  messagesSent = 0;
713  start_time = start_clock();
714  while (seqno < expectedCount)
715  {
717  char payload[128];
718 
719  ropts.onSuccess = messageSent;
720  seqno++;
721  sprintf(payload, "message number %d", seqno);
722  rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
723  opts.qos, opts.retained, &ropts);
724  while (rc != MQTTASYNC_SUCCESS)
725  {
726  MyLog(LOGA_INFO, "Rc %d from publish with payload %s, retrying", rc, payload);
727  if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
729  if (stopping)
730  goto exit;
731  MySleep(1000);
732  rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
733  opts.qos, opts.retained, &ropts);
734  }
735  //MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload);
736  //while (seqno - messagesSent > 2000)
737  //{
738  //if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
739  // recreateReconnect();
740  //}
741  // MySleep(1000);
742  }
743  MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000);
744 
745  waitForCompletion(start_time);
746  control_wait("test finished");
747 exit:
748  ; /* dummy statement for target of exit */
749 }
750 
751 
752 static int client_subscribed = 0;
753 
755 {
756  MQTTAsync c = (MQTTAsync)context;
757 
758  MyLog(LOGA_DEBUG, "In client subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
759 
760  client_subscribed = 1;
761 }
762 
764 {
765  MQTTAsync c = (MQTTAsync)context;
766  MyLog(LOGA_INFO, "In failure callback");
767 
768  client_subscribed = -1;
769 }
770 
771 
773 {
774  MQTTAsync c = (MQTTAsync)context;
776  int rc;
777 
778  sprintf(sub_topic, "%s/send", opts.control_topic);
779  sprintf(pub_topic, "%s/receive", opts.control_topic);
780  ropts.context = context;
782  ropts.onFailure = client_onFailure;
783  ropts.context = c;
784  if ((rc = MQTTAsync_subscribe(c, opts.topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
785  {
786  MyLog(LOGA_ALWAYS, "client MQTTAsync_subscribe failed, rc %d", rc);
787  client_subscribed = -1;
788  }
789 }
790 
791 
793 {
794  client_cleaned = 1;
795 }
796 
797 
799 {
800  MQTTAsync c = (MQTTAsync)context;
802  int rc;
803 
804  dopts.context = context;
806  dopts.onFailure = client_onFailure;
807  dopts.context = c;
808  if ((rc = MQTTAsync_disconnect(c, &dopts)) != MQTTASYNC_SUCCESS)
809  {
810  MyLog(LOGA_ALWAYS, "client MQTTAsync_disconnect failed, rc %d", rc);
811  stopping = 1;
812  }
813 }
814 
815 
816 int sendAndReceive(void)
817 {
818  int rc = 0;
820 
821  MyLog(LOGA_ALWAYS, "v3 async C client topic workload using QoS %d", opts.qos);
822  MyLog(LOGA_DEBUG, "Connecting to %s", opts.connection);
823 
824  if (opts.persistence)
825  persistence = MQTTCLIENT_PERSISTENCE_DEFAULT;
826 
827  rc = MQTTAsync_create(&client, opts.connection, opts.clientid, persistence, NULL);
828  if (rc != MQTTASYNC_SUCCESS)
829  {
830  MyLog(LOGA_ALWAYS, "MQTTAsync_create failed, rc %d", rc);
831  rc = 99;
832  goto exit;
833  }
834 
837  {
838  MyLog(LOGA_ALWAYS, "MQTTAsync_setCallbacks failed, rc %d", rc);
839  rc = 99;
840  goto destroy_exit;
841  }
842 
843  /* wait to know that the controlling process is running before connecting to the SUT */
844  if (control_wait("who is ready?") == 0)
845  {
846  MyLog(LOGA_ALWAYS, "Wait for controller failed");
847  goto exit;
848  }
849 
850  /* connect cleansession, and then disconnect, to clean up */
851  conn_opts.keepAliveInterval = 10;
852  conn_opts.username = opts.username;
853  conn_opts.password = opts.password;
854  conn_opts.cleansession = 1;
855  conn_opts.context = client;
856  conn_opts.onSuccess = client_onCleaned;
857  conn_opts.onFailure = client_onFailure;
858  if (opts.connections)
859  {
860  conn_opts.serverURIcount = opts.connection_count;
861  conn_opts.serverURIs = opts.connections;
862  }
863  else
864  {
865  conn_opts.serverURIcount = 0;
866  conn_opts.serverURIs = NULL;
867  }
868  if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
869  {
870  MyLog(LOGA_ALWAYS, "MQTTAsync_connect failed, rc %d", rc);
871  rc = 99;
872  goto destroy_exit;
873  }
874 
875  while (client_cleaned == 0)
876  MySleep(1000);
877 
878  MyLog(LOGA_ALWAYS, "Client state cleaned up");
879 
880  conn_opts.cleansession = 0;
881  conn_opts.context = client;
882  conn_opts.onSuccess = client_onConnect;
883  conn_opts.onFailure = client_onFailure;
884  conn_opts.retryInterval = 1;
885  if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
886  {
887  MyLog(LOGA_ALWAYS, "MQTTAsync_connect failed, rc %d", rc);
888  rc = 99;
889  goto destroy_exit;
890  }
891 
892  /* wait until subscribed */
893  while (client_subscribed == 0)
894  MySleep(1000);
895 
896  if (client_subscribed != 1)
897  goto disconnect_exit;
898 
899  while (1)
900  {
901  control_send("Ready");
902  if (control_which("who is ready?", "continue") == 2)
903  break;
904  control_send("Ready");
905  }
906 
907  while (!stopping)
908  {
909  one_iteration();
910  }
911 
912 disconnect_exit:
914 
915 destroy_exit:
917 
918 exit:
919  return rc;
920 }
921 
922 
923 static int control_subscribed = 0;
924 
926 {
927  MQTTAsync c = (MQTTAsync)context;
928 
929  MyLog(LOGA_DEBUG, "In control subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
930 
931  control_subscribed = 1;
932  MyLog(LOGA_ALWAYS, "Connected and subscribed to control connection");
933 }
934 
936 {
937  MQTTAsync c = (MQTTAsync)context;
938 
939  control_subscribed = -1;
940 }
941 
942 
944 {
945  MQTTAsync c = (MQTTAsync)context;
947  int rc;
948 
949  sprintf(sub_topic, "%s/send", opts.control_topic);
950  sprintf(pub_topic, "%s/receive", opts.control_topic);
953  ropts.context = c;
954  MyLog(LOGA_ALWAYS, "Subscribing to control topic %s", sub_topic);
955  if ((rc = MQTTAsync_subscribe(c, sub_topic, 2, &ropts)) != MQTTASYNC_SUCCESS)
956  {
957  MyLog(LOGA_ALWAYS, "control MQTTAsync_subscribe failed, rc %d", rc);
958  control_subscribed = -1;
959  }
960 }
961 
962 void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
963 {
964  //if (level == MQTTASYNC_TRACE_ERROR || strstr(message, "Connect") || strstr(message, "failed"))
965  printf("Trace : %d, %s\n", level, message);
966 }
967 
968 int main(int argc, char** argv)
969 {
971  int rc = 0;
972  static char topic_buf[200];
973  static char clientid[40];
974 
975 #if !defined(_WIN32)
976  signal(SIGPIPE, SIG_IGN);
977 #endif
978 
980 
981  while (info->name)
982  {
983  MyLog(LOGA_ALWAYS, "%s: %s\n", info->name, info->value);
984  info++;
985  }
986 
987  getopts(argc, argv);
988 
989  sprintf(topic_buf, "%s_%d", opts.topic, opts.slot_no);
990  opts.topic = topic_buf;
991 
992  sprintf(clientid, "%s_%d", opts.clientid, opts.slot_no);
993  opts.clientid = clientid;
994 
995  MyLog(LOGA_ALWAYS, "Starting with clientid %s", opts.clientid);
996 
999 
1000  rc = MQTTAsync_create(&control_client, opts.control_connection,
1001  opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
1002  if (rc != MQTTASYNC_SUCCESS)
1003  {
1004  MyLog(LOGA_ALWAYS, "control MQTTAsync_create failed, rc %d", rc);
1005  rc = 99;
1006  goto exit;
1007  }
1008 
1011  {
1012  MyLog(LOGA_ALWAYS, "control MQTTAsync_setCallbacks failed, rc %d", rc);
1013  rc = 99;
1014  goto destroy_exit;
1015  }
1016 
1017  control_subscribed = 0;
1018  control_conn_opts.context = control_client;
1019  control_conn_opts.keepAliveInterval = 10;
1020  control_conn_opts.onSuccess = control_onConnect;
1021  control_conn_opts.onFailure = control_onFailure;
1022  if ((rc = MQTTAsync_connect(control_client, &control_conn_opts))
1023  != MQTTASYNC_SUCCESS)
1024  {
1025  MyLog(LOGA_ALWAYS, "control MQTTAsync_connect failed, rc %d", rc);
1026  rc = 99;
1027  goto destroy_exit;
1028  }
1029 
1030  while (control_subscribed == 0)
1031  MySleep(1000);
1032 
1033  if (control_subscribed != 1)
1034  goto destroy_exit;
1035 
1036  sendAndReceive();
1037 
1038 exit:
1040 
1041 destroy_exit:
1043 
1044  return 0;
1045 }
void client_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test6.c:763
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
errors
Definition: MQTTPacket.h:37
long roundtrip_time
Definition: test6.c:280
long last_completion_time
Definition: test6.c:290
void control_connectionLost(void *context, char *cause)
Definition: test6.c:293
int success(int count)
Definition: test6.c:567
void client_onCleanedDisconnected(void *context, MQTTAsync_successData *response)
Definition: test6.c:792
int recreateReconnect(void)
Definition: test6.c:517
int recreated
Definition: test6.c:284
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
Definition: core.h:2081
void client_onConnect(void *context, MQTTAsync_successData *response)
Definition: test6.c:772
char * topic
Definition: test6.c:52
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
Definition: test6.c:402
const char * name
Definition: MQTTAsync.h:1149
void client_onReconnectFailure(void *context, MQTTAsync_failureData *response)
Definition: test6.c:452
union MQTTAsync_successData::@46 alt
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
static int control_subscribed
Definition: test6.c:923
static char sub_topic[200]
Definition: test6.c:44
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
int main(int argc, char **argv)
Definition: test6.c:968
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
Definition: chrono.h:375
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
#define MQTTASYNC_DISCONNECTED
Definition: MQTTAsync.h:127
int verbose
Definition: test6.c:60
void usage(void)
Definition: test6.c:37
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
void client_onCleaned(void *context, MQTTAsync_successData *response)
Definition: test6.c:798
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
#define LOGA_INFO
Definition: test6.c:172
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
static char msg_buf[512]
Definition: Log.c:122
int control_which(char *message1, char *message2)
Definition: test6.c:383
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
void getopts(int argc, char **argv)
Definition: test6.c:80
constexpr size_t count()
Definition: core.h:960
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
char * username
Definition: test6.c:58
char * wait_message
Definition: test6.c:287
long elapsed(START_TIME_TYPE start_time)
Definition: test6.c:264
MQTTAsync control_client
Definition: test6.c:274
int slot_no
Definition: test6.c:55
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
void connectionLost(void *context, char *cause)
Definition: test6.c:475
#define LOGA_ALWAYS
Definition: test6.c:171
void messageSent(void *context, MQTTAsync_successData *response)
Definition: test6.c:632
int stopping
Definition: test6.c:282
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: test6.c:962
int qos
Definition: test6.c:56
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1393
char * wait_message2
Definition: test6.c:288
int messagesSent
Definition: test6.c:630
void MyLog(int LOGA_level, char *format,...)
Definition: test6.c:176
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
#define LOGA_DEBUG
Definition: test6.c:170
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
int expectedCount
Definition: test6.c:278
int waitForCompletion(START_TIME_TYPE start_time)
Definition: test6.c:595
int control_send(char *message)
Definition: test6.c:338
int control_found
Definition: test6.c:289
int persistence
Definition: test6.c:61
#define MQTTCLIENT_PERSISTENCE_DEFAULT
MQTTAsync client
Definition: test6.c:276
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
void control_onConnect(void *context, MQTTAsync_successData *response)
Definition: test6.c:943
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
const char * value
Definition: MQTTAsync.h:1150
MQTTAsync_connectOptions conn_opts
Definition: test6.c:275
int measuring
Definition: test6.c:279
static char pub_topic[200]
Definition: test6.c:43
void client_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test6.c:754
int sendAndReceive(void)
Definition: test6.c:816
char * connection
Definition: test6.c:48
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
MQTTClient c
Definition: test10.c:1656
int retained
Definition: test6.c:57
dictionary context
Definition: test2.py:57
#define START_TIME_TYPE
Definition: test6.c:238
char * clientid
Definition: test6.c:54
int test_count
Definition: test6.c:291
null localtime_s(...)
Definition: chrono.h:286
void one_iteration(void)
Definition: test6.c:638
#define MQTTCLIENT_PERSISTENCE_NONE
int control_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
Definition: test6.c:308
char ** connections
Definition: test6.c:49
void MySleep(long milliseconds)
Definition: test6.c:213
START_TIME_TYPE global_start_time
Definition: test6.c:400
void client_onReconnect(void *context, MQTTAsync_successData *response)
Definition: test6.c:444
char * password
Definition: test6.c:59
int arrivedCount
Definition: test6.c:277
void control_onSubscribe(void *context, MQTTAsync_successData *response)
Definition: test6.c:925
int connection_lost
Definition: test6.c:283
int client_cleaned
Definition: test6.c:285
char *const * serverURIs
Definition: MQTTAsync.h:1277
enum MQTTReasonCodes rc
Definition: test10.c:1112
char * control_connection
Definition: test6.c:51
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
struct @89 opts
int control_wait(char *message)
Definition: test6.c:355
static int client_subscribed
Definition: test6.c:752
void control_onFailure(void *context, MQTTAsync_failureData *response)
Definition: test6.c:935
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387
int connection_count
Definition: test6.c:50
START_TIME_TYPE start_clock(void)
Definition: test6.c:240
char * control_topic
Definition: test6.c:53


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 04:02:48