paho_c_sub.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2019 IBM Corp., and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial contribution
15  * Ian Craggs - fix for bug 413429 - connectionLost not called
16  * Guilherme Maciel Ferreira - add keep alive option
17  * Ian Craggs - add full capability
18  *******************************************************************************/
19 
20 #include "MQTTAsync.h"
21 #include "MQTTClientPersistence.h"
22 #include "pubsub_opts.h"
23 
24 #include <stdio.h>
25 #include <signal.h>
26 #include <string.h>
27 #include <stdlib.h>
28 
29 
30 #if defined(_WIN32)
31 #include <windows.h>
32 #define sleep Sleep
33 #else
34 #include <sys/time.h>
35 #include <unistd.h>
36 #endif
37 
38 #if defined(_WRS_KERNEL)
39 #include <OsWrapper.h>
40 #endif
41 
42 volatile int finished = 0;
43 int subscribed = 0;
44 int disconnected = 0;
45 
46 
47 void mysleep(int ms)
48 {
49  #if defined(_WIN32)
50  Sleep(ms);
51  #else
52  usleep(ms * 1000);
53  #endif
54 }
55 
56 void cfinish(int sig)
57 {
58  signal(SIGINT, NULL);
59  finished = 1;
60 }
61 
62 
63 struct pubsub_opts opts =
64 {
65  0, 0, 0, 0, "\n", 100, /* debug/app options */
66  NULL, NULL, 1, 0, 0, /* message options */
67  MQTTVERSION_DEFAULT, NULL, "paho-c-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
68  NULL, NULL, 0, 0, /* will options */
69  0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
70  0, {NULL, NULL}, /* MQTT V5 options */
71 };
72 
73 
74 int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
75 {
76  size_t delimlen = 0;
77 
78  if (opts.verbose)
79  printf("%d %s\t", message->payloadlen, topicName);
80  if (opts.delimiter)
81  delimlen = strlen(opts.delimiter);
82  if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
83  strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
84  printf("%.*s", message->payloadlen, (char*)message->payload);
85  else
86  printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
87  if (message->struct_version == 1 && opts.verbose)
88  logProperties(&message->properties);
89  fflush(stdout);
90  MQTTAsync_freeMessage(&message);
91  MQTTAsync_free(topicName);
92  return 1;
93 }
94 
95 
97 {
98  disconnected = 1;
99 }
100 
101 
103 {
104  subscribed = 1;
105 }
106 
108 {
109  subscribed = 1;
110 }
111 
112 
114 {
115  if (!opts.quiet)
116  fprintf(stderr, "Subscribe failed, rc %s reason code %s\n",
117  MQTTAsync_strerror(response->code),
119  finished = 1;
120 }
121 
122 
124 {
125  if (!opts.quiet)
126  fprintf(stderr, "Subscribe failed, rc %s\n",
127  MQTTAsync_strerror(response->code));
128  finished = 1;
129 }
130 
131 
133 {
134  if (!opts.quiet)
135  fprintf(stderr, "Connect failed, rc %s reason code %s\n",
136  MQTTAsync_strerror(response->code),
138  finished = 1;
139 }
140 
141 
143 {
144  if (!opts.quiet)
145  fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
146  finished = 1;
147 }
148 
149 
151 {
152  MQTTAsync client = (MQTTAsync)context;
154  int rc;
155 
156  if (opts.verbose)
157  printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
158 
159  copts.onSuccess5 = onSubscribe5;
161  copts.context = client;
162  if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
163  {
164  if (!opts.quiet)
165  fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
166  finished = 1;
167  }
168 }
169 
170 
172 {
173  MQTTAsync client = (MQTTAsync)context;
175  int rc;
176 
177  if (opts.verbose)
178  printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
179 
180  ropts.onSuccess = onSubscribe;
182  ropts.context = client;
183  if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
184  {
185  if (!opts.quiet)
186  fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
187  finished = 1;
188  }
189 }
190 
192 
193 
195 {
196  fprintf(stderr, "Trace : %d, %s\n", level, message);
197 }
198 
199 
200 int main(int argc, char** argv)
201 {
207  int rc = 0;
208  char* url = NULL;
209  const char* version = NULL;
210  const char* program_name = "paho_c_sub";
212 #if !defined(_WIN32)
213  struct sigaction sa;
214 #endif
215 
216  if (argc < 2)
217  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
218 
219  if (getopts(argc, argv, &opts) != 0)
220  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
221 
222  if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
223  opts.verbose = 1;
224 
225  if (opts.connection)
226  url = opts.connection;
227  else
228  {
229  url = malloc(100);
230  sprintf(url, "%s:%s", opts.host, opts.port);
231  }
232  if (opts.verbose)
233  printf("URL is %s\n", url);
234 
235  if (opts.tracelevel > 0)
236  {
239  }
240 
241  if (opts.MQTTVersion >= MQTTVERSION_5)
242  create_opts.MQTTVersion = MQTTVERSION_5;
244  NULL, &create_opts);
245  if (rc != MQTTASYNC_SUCCESS)
246  {
247  if (!opts.quiet)
248  fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
249  exit(EXIT_FAILURE);
250  }
251 
252  rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
253  if (rc != MQTTASYNC_SUCCESS)
254  {
255  if (!opts.quiet)
256  fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
257  exit(EXIT_FAILURE);
258  }
259 
260 #if defined(_WIN32)
261  signal(SIGINT, cfinish);
262  signal(SIGTERM, cfinish);
263 #else
264  memset(&sa, 0, sizeof(struct sigaction));
265  sa.sa_handler = cfinish;
266  sa.sa_flags = 0;
267 
268  sigaction(SIGINT, &sa, NULL);
269  sigaction(SIGTERM, &sa, NULL);
270 #endif
271 
272  if (opts.MQTTVersion == MQTTVERSION_5)
273  {
275  conn_opts = conn_opts5;
276  conn_opts.onSuccess5 = onConnect5;
277  conn_opts.onFailure5 = onConnectFailure5;
278  conn_opts.cleanstart = 1;
279  }
280  else
281  {
282  conn_opts.onSuccess = onConnect;
283  conn_opts.onFailure = onConnectFailure;
284  conn_opts.cleansession = 1;
285  }
286  conn_opts.keepAliveInterval = opts.keepalive;
287  conn_opts.username = opts.username;
288  conn_opts.password = opts.password;
289  conn_opts.MQTTVersion = opts.MQTTVersion;
290  conn_opts.context = client;
291  conn_opts.automaticReconnect = 1;
292 
293  if (opts.will_topic) /* will options */
294  {
295  will_opts.message = opts.will_payload;
296  will_opts.topicName = opts.will_topic;
297  will_opts.qos = opts.will_qos;
298  will_opts.retained = opts.will_retain;
299  conn_opts.will = &will_opts;
300  }
301 
302  if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
303  strncmp(opts.connection, "wss://", 6) == 0))
304  {
305  ssl_opts.verify = (opts.insecure) ? 0 : 1;
306  ssl_opts.CApath = opts.capath;
307  ssl_opts.keyStore = opts.cert;
308  ssl_opts.trustStore = opts.cafile;
309  ssl_opts.privateKey = opts.key;
310  ssl_opts.privateKeyPassword = opts.keypass;
311  ssl_opts.enabledCipherSuites = opts.ciphers;
312  conn_opts.ssl = &ssl_opts;
313  }
314 
315  if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
316  {
317  if (!opts.quiet)
318  fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
319  exit(EXIT_FAILURE);
320  }
321 
322  while (!subscribed)
323  mysleep(100);
324 
325  if (finished)
326  goto exit;
327 
328  while (!finished)
329  mysleep(100);
330 
331  disc_opts.onSuccess = onDisconnect;
332  if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
333  {
334  if (!opts.quiet)
335  fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
336  exit(EXIT_FAILURE);
337  }
338 
339  while (!disconnected)
340  mysleep(100);
341 
342 exit:
343  MQTTAsync_destroy(&client);
344 
345  return EXIT_SUCCESS;
346 }
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
MQTTProperties properties
Definition: MQTTAsync.h:316
char * topic
Definition: pubsub_opts.h:41
const char * keyStore
Definition: MQTTAsync.h:1053
char * capath
Definition: pubsub_opts.h:58
char * message
Definition: pubsub_opts.h:34
void cfinish(int sig)
Definition: paho_c_sub.c:56
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
const char * trustStore
Definition: MQTTAsync.h:1048
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:726
char * connection
Definition: pubsub_opts.h:49
char * cert
Definition: pubsub_opts.h:59
void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
Definition: paho_c_sub.c:123
char * will_topic
Definition: pubsub_opts.h:52
const char * message
Definition: MQTTAsync.h:996
const char * topicName
Definition: MQTTAsync.h:994
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
void onDisconnect(void *context, MQTTAsync_successData *response)
Definition: paho_c_sub.c:96
char * delimiter
Definition: pubsub_opts.h:31
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
const char * MQTTReasonCode_toString(enum MQTTReasonCodes value)
void usage(struct pubsub_opts *opts, pubsub_opts_nameValue *name_values, const char *program_name)
Definition: pubsub_opts.c:40
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
char * key
Definition: pubsub_opts.h:61
void onConnectFailure5(void *context, MQTTAsync_failureData5 *response)
Definition: paho_c_sub.c:132
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:720
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: paho_c_sub.c:194
MQTTAsync_connectOptions conn_opts
Definition: paho_c_sub.c:191
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
char * clientid
Definition: pubsub_opts.h:42
char * host
Definition: pubsub_opts.h:47
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
char * cafile
Definition: pubsub_opts.h:60
#define MQTTAsync_connectOptions_initializer5
Definition: MQTTAsync.h:1338
char * ciphers
Definition: pubsub_opts.h:63
This structure represents a persistent data store, used to store outbound and inbound messages...
void onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: paho_c_sub.c:142
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
void onConnect5(void *context, MQTTAsync_successData5 *response)
Definition: paho_c_sub.c:150
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1327
void onSubscribe5(void *context, MQTTAsync_successData5 *response)
Definition: paho_c_sub.c:102
#define MQTTAsync_createOptions_initializer
Definition: MQTTAsync.h:965
int subscribed
Definition: paho_c_sub.c:43
MQTTAsync_SSLOptions * ssl
Definition: MQTTAsync.h:1243
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
int MQTTVersion
Definition: pubsub_opts.h:40
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
const char * privateKeyPassword
Definition: MQTTAsync.h:1061
const char * enabledCipherSuites
Definition: MQTTAsync.h:1071
version
Definition: setup.py:18
MQTTAsync client
Definition: test6.c:276
int will_retain
Definition: pubsub_opts.h:55
int main(int argc, char **argv)
Definition: paho_c_sub.c:200
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
void onConnect(void *context, MQTTAsync_successData *response)
Definition: paho_c_sub.c:171
char * keypass
Definition: pubsub_opts.h:62
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
void logProperties(MQTTProperties *props)
Definition: pubsub_opts.c:466
dictionary context
Definition: test2.py:57
const char * privateKey
Definition: MQTTAsync.h:1058
#define MQTTCLIENT_PERSISTENCE_NONE
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:532
const char * MQTTAsync_strerror(int code)
Definition: MQTTAsync.c:4944
volatile int finished
Definition: paho_c_sub.c:42
#define MQTTAsync_SSLOptions_initializer
Definition: MQTTAsync.h:1144
void onSubscribeFailure5(void *context, MQTTAsync_failureData5 *response)
Definition: paho_c_sub.c:113
char * password
Definition: pubsub_opts.h:46
int disconnected
Definition: paho_c_sub.c:44
char * will_payload
Definition: pubsub_opts.h:53
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
Definition: paho_c_sub.c:74
int getopts(int argc, char **argv, struct pubsub_opts *opts)
Definition: pubsub_opts.c:128
int tracelevel
Definition: pubsub_opts.h:30
enum MQTTReasonCodes rc
Definition: test10.c:1112
const char * CApath
Definition: MQTTAsync.h:1094
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
#define MQTTAsync_callOptions_initializer
Definition: MQTTAsync.h:750
void mysleep(int ms)
Definition: paho_c_sub.c:47
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
struct pubsub_opts opts
Definition: paho_c_sub.c:63
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1321
char * port
Definition: pubsub_opts.h:48
void onSubscribe(void *context, MQTTAsync_successData *response)
Definition: paho_c_sub.c:107
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387
char * username
Definition: pubsub_opts.h:45


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:10