paho_c_pub.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2020 IBM Corp., and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial contribution
15  * Guilherme Maciel Ferreira - add keep alive option
16  * Ian Craggs - add full capability
17  *******************************************************************************/
18 
19 #include "MQTTAsync.h"
20 #include "pubsub_opts.h"
21 
22 #include <stdio.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <stdlib.h>
26 
27 #if defined(_WIN32)
28 #include <windows.h>
29 #define sleep Sleep
30 #else
31 #include <unistd.h>
32 #include <sys/time.h>
33 #include <unistd.h>
34 #endif
35 
36 #if defined(_WRS_KERNEL)
37 #include <OsWrapper.h>
38 #endif
39 
40 volatile int toStop = 0;
41 
42 struct pubsub_opts opts =
43 {
44  1, 0, 0, 0, "\n", 100, /* debug/app options */
45  NULL, NULL, 1, 0, 0, /* message options */
46  MQTTVERSION_DEFAULT, NULL, "paho-c-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
47  NULL, NULL, 0, 0, /* will options */
48  0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
49  0, {NULL, NULL}, /* MQTT V5 options */
50 };
51 
55 
56 
57 void mysleep(int ms)
58 {
59  #if defined(_WIN32)
60  Sleep(ms);
61  #else
62  usleep(ms * 1000);
63  #endif
64 }
65 
66 void cfinish(int sig)
67 {
68  signal(SIGINT, NULL);
69  toStop = 1;
70 }
71 
72 
73 int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
74 {
75  /* not expecting any messages */
76  return 1;
77 }
78 
79 
80 static int disconnected = 0;
81 
83 {
84  disconnected = 1;
85 }
86 
88 {
89  disconnected = 1;
90 }
91 
92 
93 static int connected = 0;
95 int mypublish(MQTTAsync client, int datalen, char* data);
96 
98 {
99  fprintf(stderr, "Connect failed, rc %s reason code %s\n",
100  MQTTAsync_strerror(response->code),
102  connected = -1;
103 
104  MQTTAsync client = (MQTTAsync)context;
105 }
106 
108 {
109  fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
110  connected = -1;
111 
112  MQTTAsync client = (MQTTAsync)context;
113 }
114 
115 
117 {
118  MQTTAsync client = (MQTTAsync)context;
119  int rc = 0;
120 
121  if (opts.verbose)
122  printf("Connected\n");
123 
124  if (opts.null_message == 1)
125  rc = mypublish(client, 0, "");
126  else if (opts.message)
127  rc = mypublish(client, (int)strlen(opts.message), opts.message);
128  else if (opts.filename)
129  {
130  int data_len = 0;
131  char* buffer = readfile(&data_len, &opts);
132 
133  if (buffer == NULL)
134  toStop = 1;
135  else
136  {
137  rc = mypublish(client, data_len, buffer);
138  free(buffer);
139  }
140  }
141 
142  connected = 1;
143 }
144 
146 {
147  MQTTAsync client = (MQTTAsync)context;
148  int rc = 0;
149 
150  if (opts.verbose)
151  printf("Connected\n");
152 
153  if (opts.null_message == 1)
154  rc = mypublish(client, 0, "");
155  else if (opts.message)
156  rc = mypublish(client, (int)strlen(opts.message), opts.message);
157  else if (opts.filename)
158  {
159  int data_len = 0;
160  char* buffer = readfile(&data_len, &opts);
161 
162  if (buffer == NULL)
163  toStop = 1;
164  else
165  {
166  rc = mypublish(client, data_len, buffer);
167  free(buffer);
168  }
169  }
170 
171  connected = 1;
172 }
173 
174 
175 static int published = 0;
176 
178 {
179  if (opts.verbose)
180  fprintf(stderr, "Publish failed, rc %s reason code %s\n",
181  MQTTAsync_strerror(response->code),
183  published = -1;
184 }
185 
187 {
188  if (opts.verbose)
189  fprintf(stderr, "Publish failed, rc %s\n", MQTTAsync_strerror(response->code));
190  published = -1;
191 }
192 
193 
195 {
196  if (opts.verbose)
197  printf("Publish succeeded, reason code %s\n",
199 
200  if (opts.null_message || opts.message || opts.filename)
201  toStop = 1;
202 
203  published = 1;
204 }
205 
206 
208 {
209  if (opts.verbose)
210  printf("Publish succeeded\n");
211 
212  if (opts.null_message || opts.message || opts.filename)
213  toStop = 1;
214 
215  published = 1;
216 }
217 
218 
219 static int onSSLError(const char *str, size_t len, void *context)
220 {
221  MQTTAsync client = (MQTTAsync)context;
222  return fprintf(stderr, "SSL error: %s\n", str);
223 }
224 
225 static unsigned int onPSKAuth(const char* hint,
226  char* identity,
227  unsigned int max_identity_len,
228  unsigned char* psk,
229  unsigned int max_psk_len,
230  void* context)
231 {
232  int psk_len;
233  int k, n;
234 
235  int rc = 0;
236  struct pubsub_opts* opts = context;
237 
238  /* printf("Trying TLS-PSK auth with hint: %s\n", hint);*/
239 
240  if (opts->psk == NULL || opts->psk_identity == NULL)
241  {
242  /* printf("No PSK entered\n"); */
243  goto exit;
244  }
245 
246  /* psk should be array of bytes. This is a quick and dirty way to
247  * convert hex to bytes without input validation */
248  psk_len = (int)strlen(opts->psk) / 2;
249  if (psk_len > max_psk_len)
250  {
251  fprintf(stderr, "PSK too long\n");
252  goto exit;
253  }
254  for (k=0, n=0; k < psk_len; k++, n += 2)
255  {
256  sscanf(&opts->psk[n], "%2hhx", &psk[k]);
257  }
258 
259  /* identity should be NULL terminated string */
260  strncpy(identity, opts->psk_identity, max_identity_len);
261  if (identity[max_identity_len - 1] != '\0')
262  {
263  fprintf(stderr, "Identity too long\n");
264  goto exit;
265  }
266 
267  /* Function should return length of psk on success. */
268  rc = psk_len;
269 
270 exit:
271  return rc;
272 }
273 
275 {
279  int rc = 0;
280 
281  if (opts.verbose)
282  printf("Connecting\n");
283  if (opts.MQTTVersion == MQTTVERSION_5)
284  {
286  conn_opts = conn_opts5;
287  conn_opts.onSuccess5 = onConnect5;
288  conn_opts.onFailure5 = onConnectFailure5;
289  conn_opts.cleanstart = 1;
290  }
291  else
292  {
293  conn_opts.onSuccess = onConnect;
294  conn_opts.onFailure = onConnectFailure;
295  conn_opts.cleansession = 1;
296  }
297  conn_opts.keepAliveInterval = opts.keepalive;
298  conn_opts.username = opts.username;
299  conn_opts.password = opts.password;
300  conn_opts.MQTTVersion = opts.MQTTVersion;
301  conn_opts.context = client;
302  conn_opts.automaticReconnect = 1;
303 
304  if (opts.will_topic) /* will options */
305  {
306  will_opts.message = opts.will_payload;
307  will_opts.topicName = opts.will_topic;
308  will_opts.qos = opts.will_qos;
309  will_opts.retained = opts.will_retain;
310  conn_opts.will = &will_opts;
311  }
312 
313  if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
314  strncmp(opts.connection, "wss://", 6) == 0))
315  {
316  if (opts.insecure)
317  ssl_opts.verify = 0;
318  else
319  ssl_opts.verify = 1;
320  ssl_opts.CApath = opts.capath;
321  ssl_opts.keyStore = opts.cert;
322  ssl_opts.trustStore = opts.cafile;
323  ssl_opts.privateKey = opts.key;
324  ssl_opts.privateKeyPassword = opts.keypass;
325  ssl_opts.enabledCipherSuites = opts.ciphers;
326  ssl_opts.ssl_error_cb = onSSLError;
327  ssl_opts.ssl_error_context = client;
328  ssl_opts.ssl_psk_cb = onPSKAuth;
329  ssl_opts.ssl_psk_context = &opts;
330  conn_opts.ssl = &ssl_opts;
331  }
332 
333  connected = 0;
334  if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
335  {
336  fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
337  exit(EXIT_FAILURE);
338  }
339 }
340 
341 
342 int mypublish(MQTTAsync client, int datalen, char* data)
343 {
344  int rc;
345 
346  if (opts.verbose)
347  printf("Publishing data of length %d\n", datalen);
348 
349  rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts);
350  if (opts.verbose && rc != MQTTASYNC_SUCCESS && !opts.quiet)
351  fprintf(stderr, "Error from MQTTAsync_send: %s\n", MQTTAsync_strerror(rc));
352 
353  return rc;
354 }
355 
356 
358 {
359  fprintf(stderr, "Trace : %d, %s\n", level, message);
360 }
361 
362 
363 int main(int argc, char** argv)
364 {
368  char* buffer = NULL;
369  char* url = NULL;
370  int url_allocated = 0;
371  int rc = 0;
372  const char* version = NULL;
373  const char* program_name = "paho_c_pub";
375 #if !defined(_WIN32)
376  struct sigaction sa;
377 #endif
378 
379  if (argc < 2)
380  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
381 
382  if (getopts(argc, argv, &opts) != 0)
383  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
384 
385  if (opts.connection)
386  url = opts.connection;
387  else
388  {
389  url = malloc(100);
390  url_allocated = 1;
391  sprintf(url, "%s:%s", opts.host, opts.port);
392  }
393  if (opts.verbose)
394  printf("URL is %s\n", url);
395 
396  if (opts.tracelevel > 0)
397  {
400  }
401 
402  create_opts.sendWhileDisconnected = 1;
403  if (opts.MQTTVersion >= MQTTVERSION_5)
404  create_opts.MQTTVersion = MQTTVERSION_5;
405  rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
406  if (rc != MQTTASYNC_SUCCESS)
407  {
408  if (!opts.quiet)
409  fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
410  exit(EXIT_FAILURE);
411  }
412 
413 #if defined(_WIN32)
414  signal(SIGINT, cfinish);
415  signal(SIGTERM, cfinish);
416 #else
417  memset(&sa, 0, sizeof(struct sigaction));
418  sa.sa_handler = cfinish;
419  sa.sa_flags = 0;
420 
421  sigaction(SIGINT, &sa, NULL);
422  sigaction(SIGTERM, &sa, NULL);
423 #endif
424 
425  rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
426  if (rc != MQTTASYNC_SUCCESS)
427  {
428  if (!opts.quiet)
429  fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
430  exit(EXIT_FAILURE);
431  }
432 
433  if (opts.MQTTVersion >= MQTTVERSION_5)
434  {
435  pub_opts.onSuccess5 = onPublish5;
436  pub_opts.onFailure5 = onPublishFailure5;
437 
438  if (opts.message_expiry > 0)
439  {
440  property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
441  property.value.integer4 = opts.message_expiry;
442  MQTTProperties_add(&props, &property);
443  }
444  if (opts.user_property.name)
445  {
446  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
447  property.value.data.data = opts.user_property.name;
448  property.value.data.len = (int)strlen(opts.user_property.name);
449  property.value.value.data = opts.user_property.value;
450  property.value.value.len = (int)strlen(opts.user_property.value);
451  MQTTProperties_add(&props, &property);
452  }
453  pub_opts.properties = props;
454  }
455  else
456  {
457  pub_opts.onSuccess = onPublish;
458  pub_opts.onFailure = onPublishFailure;
459  }
460 
461  myconnect(client);
462 
463  while (!toStop)
464  {
465  int data_len = 0;
466  int delim_len = 0;
467 
468  if (opts.stdin_lines)
469  {
470  buffer = malloc(opts.maxdatalen);
471 
472  delim_len = (int)strlen(opts.delimiter);
473  do
474  {
475  buffer[data_len++] = getchar();
476  if (data_len > delim_len)
477  {
478  if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
479  break;
480  }
481  } while (data_len < opts.maxdatalen);
482 
483  rc = mypublish(client, data_len, buffer);
484  }
485  else
486  mysleep(100);
487  }
488 
489  if (opts.message == 0 && opts.null_message == 0 && opts.filename == 0)
490  free(buffer);
491 
492  if (opts.MQTTVersion >= MQTTVERSION_5)
493  disc_opts.onSuccess5 = onDisconnect5;
494  else
495  disc_opts.onSuccess = onDisconnect;
496  if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
497  {
498  if (!opts.quiet)
499  fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
500  exit(EXIT_FAILURE);
501  }
502 
503  while (!disconnected)
504  mysleep(100);
505 
506  MQTTAsync_destroy(&client);
507 
508  if (url_allocated)
509  free(url);
510 
511  return EXIT_SUCCESS;
512 }
513 
514 
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
char * topic
Definition: pubsub_opts.h:41
const char * keyStore
Definition: MQTTAsync.h:1053
char * capath
Definition: pubsub_opts.h:58
char * message
Definition: pubsub_opts.h:34
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
const char * trustStore
Definition: MQTTAsync.h:1048
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:726
char * connection
Definition: pubsub_opts.h:49
MQTTProperties props
Definition: paho_c_pub.c:54
char * cert
Definition: pubsub_opts.h:59
char * will_topic
Definition: pubsub_opts.h:52
const char * message
Definition: MQTTAsync.h:996
const char * topicName
Definition: MQTTAsync.h:994
volatile int toStop
Definition: paho_c_pub.c:40
void onConnectFailure5(void *context, MQTTAsync_failureData5 *response)
Definition: paho_c_pub.c:97
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
static int published
Definition: paho_c_pub.c:175
char * readfile(int *data_len, struct pubsub_opts *opts)
Definition: pubsub_opts.c:430
char * delimiter
Definition: pubsub_opts.h:31
char * psk_identity
Definition: pubsub_opts.h:64
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
char * filename
Definition: pubsub_opts.h:35
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
Definition: paho_c_pub.c:357
int mypublish(MQTTAsync client, int datalen, char *data)
Definition: paho_c_pub.c:342
const char * MQTTReasonCode_toString(enum MQTTReasonCodes value)
char * name
Definition: pubsub_opts.h:69
struct pubsub_opts opts
Definition: paho_c_pub.c:42
void usage(struct pubsub_opts *opts, pubsub_opts_nameValue *name_values, const char *program_name)
Definition: pubsub_opts.c:40
int MQTTProperties_add(MQTTProperties *props, const MQTTProperty *prop)
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
Definition: MQTTAsync.c:4909
void * MQTTAsync
Definition: MQTTAsync.h:239
char * key
Definition: pubsub_opts.h:61
MQTTAsync_responseOptions pub_opts
Definition: paho_c_pub.c:52
#define malloc(x)
Definition: Heap.h:41
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:583
void mysleep(int ms)
Definition: paho_c_pub.c:57
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:720
static int connected
Definition: paho_c_pub.c:93
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
Definition: MQTTAsync.c:4903
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTASYNC_TRACE_LEVELS
Definition: MQTTAsync.h:1650
MQTTAsync_connectOptions conn_opts
Definition: paho_c_sub.c:191
#define free(x)
Definition: Heap.h:55
static unsigned int onPSKAuth(const char *hint, char *identity, unsigned int max_identity_len, unsigned char *psk, unsigned int max_psk_len, void *context)
Definition: paho_c_pub.c:225
char * clientid
Definition: pubsub_opts.h:42
char * host
Definition: pubsub_opts.h:47
void cfinish(int sig)
Definition: paho_c_pub.c:66
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
char * cafile
Definition: pubsub_opts.h:60
#define MQTTAsync_connectOptions_initializer5
Definition: MQTTAsync.h:1338
void onPublishFailure5(void *context, MQTTAsync_failureData5 *response)
Definition: paho_c_pub.c:177
char * ciphers
Definition: pubsub_opts.h:63
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
int message_expiry
Definition: pubsub_opts.h:67
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
MQTTAsync_onFailure5 * onFailure5
Definition: MQTTAsync.h:1327
void onDisconnect(void *context, MQTTAsync_successData *response)
Definition: paho_c_pub.c:87
void onPublish5(void *context, MQTTAsync_successData5 *response)
Definition: paho_c_pub.c:194
int main(int argc, char **argv)
Definition: paho_c_pub.c:363
unsigned int(* ssl_psk_cb)(const char *hint, char *identity, unsigned int max_identity_len, unsigned char *psk, unsigned int max_psk_len, void *u)
Definition: MQTTAsync.h:1113
void onDisconnect5(void *context, MQTTAsync_successData5 *response)
Definition: paho_c_pub.c:82
void onPublish(void *context, MQTTAsync_successData *response)
Definition: paho_c_pub.c:207
void onConnectFailure(void *context, MQTTAsync_failureData *response)
Definition: paho_c_pub.c:107
#define MQTTAsync_createOptions_initializer
Definition: MQTTAsync.h:965
MQTTAsync_SSLOptions * ssl
Definition: MQTTAsync.h:1243
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
int MQTTVersion
Definition: pubsub_opts.h:40
void onConnect5(void *context, MQTTAsync_successData5 *response)
Definition: paho_c_pub.c:116
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
const char * privateKeyPassword
Definition: MQTTAsync.h:1061
const char * enabledCipherSuites
Definition: MQTTAsync.h:1071
version
Definition: setup.py:18
MQTTAsync client
Definition: test6.c:276
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
int will_retain
Definition: pubsub_opts.h:55
MQTTProperties properties
Definition: MQTTAsync.h:730
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
char * keypass
Definition: pubsub_opts.h:62
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
Definition: MQTTAsync.c:4897
dictionary context
Definition: test2.py:57
const char * privateKey
Definition: MQTTAsync.h:1058
static int disconnected
Definition: paho_c_pub.c:80
#define MQTTCLIENT_PERSISTENCE_NONE
char * value
Definition: pubsub_opts.h:70
enum MQTTReasonCodes reasonCode
Definition: MQTTAsync.h:532
const char * MQTTAsync_strerror(int code)
Definition: MQTTAsync.c:4944
static int onSSLError(const char *str, size_t len, void *context)
Definition: paho_c_pub.c:219
#define MQTTAsync_SSLOptions_initializer
Definition: MQTTAsync.h:1144
int stdin_lines
Definition: pubsub_opts.h:36
char * password
Definition: pubsub_opts.h:46
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1413
void onPublishFailure(void *context, MQTTAsync_failureData *response)
Definition: paho_c_pub.c:186
char * will_payload
Definition: pubsub_opts.h:53
char * psk
Definition: pubsub_opts.h:65
dictionary data
Definition: mqtt_test.py:22
int getopts(int argc, char **argv, struct pubsub_opts *opts)
Definition: pubsub_opts.c:128
int tracelevel
Definition: pubsub_opts.h:30
MQTTProperty property
Definition: paho_c_pub.c:53
enum MQTTReasonCodes rc
Definition: test10.c:1112
const char * CApath
Definition: MQTTAsync.h:1094
void myconnect(MQTTAsync client)
Definition: paho_c_pub.c:274
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
MQTTAsync_onSuccess5 * onSuccess5
Definition: MQTTAsync.h:1321
char * port
Definition: pubsub_opts.h:48
int null_message
Definition: pubsub_opts.h:38
void onConnect(void *context, MQTTAsync_successData *response)
Definition: paho_c_pub.c:145
int maxdatalen
Definition: pubsub_opts.h:32
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
Definition: paho_c_pub.c:73
int len
Definition: utf-8.c:46
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387
int(* ssl_error_cb)(const char *str, size_t len, void *u)
Definition: MQTTAsync.h:1100
struct pubsub_opts::@69 user_property
char * username
Definition: pubsub_opts.h:45
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:10