paho_cs_pub.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2018 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial contribution
15  * Ian Craggs - add full capability
16  *******************************************************************************/
17 
18 #include "MQTTClient.h"
19 #include "MQTTClientPersistence.h"
20 #include "pubsub_opts.h"
21 
22 #include <stdio.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <stdlib.h>
26 
27 #if defined(_WIN32)
28 #define sleep Sleep
29 #else
30 #include <sys/time.h>
31 #endif
32 
33 volatile int toStop = 0;
34 
35 
36 void cfinish(int sig)
37 {
38  signal(SIGINT, NULL);
39  toStop = 1;
40 }
41 
42 
43 struct pubsub_opts opts =
44 {
45  1, 0, 0, 0, "\n", 100, /* debug/app options */
46  NULL, NULL, 1, 0, 0, /* message options */
47  MQTTVERSION_DEFAULT, NULL, "paho-cs-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
48  NULL, NULL, 0, 0, /* will options */
49  0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
50  0, {NULL, NULL}, /* MQTT V5 options */
51 };
52 
53 
55 {
59  int rc = 0;
60 
61  if (opts.verbose)
62  printf("Connecting\n");
63 
64  if (opts.MQTTVersion == MQTTVERSION_5)
65  {
67  conn_opts = conn_opts5;
68  }
69 
70  conn_opts.keepAliveInterval = opts.keepalive;
71  conn_opts.username = opts.username;
72  conn_opts.password = opts.password;
73  conn_opts.MQTTVersion = opts.MQTTVersion;
74 
75  if (opts.will_topic) /* will options */
76  {
77  will_opts.message = opts.will_payload;
78  will_opts.topicName = opts.will_topic;
79  will_opts.qos = opts.will_qos;
80  will_opts.retained = opts.will_retain;
81  conn_opts.will = &will_opts;
82  }
83 
84  if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
85  strncmp(opts.connection, "wss://", 6) == 0))
86  {
87  if (opts.insecure)
88  ssl_opts.verify = 0;
89  else
90  ssl_opts.verify = 1;
91  ssl_opts.CApath = opts.capath;
92  ssl_opts.keyStore = opts.cert;
93  ssl_opts.trustStore = opts.cafile;
94  ssl_opts.privateKey = opts.key;
95  ssl_opts.privateKeyPassword = opts.keypass;
96  ssl_opts.enabledCipherSuites = opts.ciphers;
97  conn_opts.ssl = &ssl_opts;
98  }
99 
100  if (opts.MQTTVersion == MQTTVERSION_5)
101  {
105 
106  conn_opts.cleanstart = 1;
107  response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
108  rc = response.reasonCode;
109  }
110  else
111  {
112  conn_opts.cleansession = 1;
113  rc = MQTTClient_connect(client, &conn_opts);
114  }
115 
116  if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
117  printf("Connected\n");
118  else if (rc != MQTTCLIENT_SUCCESS && !opts.quiet)
119  fprintf(stderr, "Connect failed return code: %s\n", MQTTClient_strerror(rc));
120 
121  return rc;
122 }
123 
124 
125 int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
126 {
127  /* not expecting any messages */
128  return 1;
129 }
130 
131 
133 {
134  fprintf(stderr, "Trace : %d, %s\n", level, message);
135 }
136 
137 
138 int main(int argc, char** argv)
139 {
143  char* buffer = NULL;
144  int rc = 0;
145  char* url;
146  const char* version = NULL;
147 #if !defined(_WIN32)
148  struct sigaction sa;
149 #endif
150  const char* program_name = "paho_cs_pub";
152 
153  if (argc < 2)
154  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
155 
156  if (getopts(argc, argv, &opts) != 0)
157  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
158 
159  if (opts.connection)
160  url = opts.connection;
161  else
162  {
163  url = malloc(100);
164  sprintf(url, "%s:%s", opts.host, opts.port);
165  }
166  if (opts.verbose)
167  printf("URL is %s\n", url);
168 
169  if (opts.tracelevel > 0)
170  {
173  }
174 
175  if (opts.MQTTVersion >= MQTTVERSION_5)
176  createOpts.MQTTVersion = MQTTVERSION_5;
178  NULL, &createOpts);
179  if (rc != MQTTCLIENT_SUCCESS)
180  {
181  if (!opts.quiet)
182  fprintf(stderr, "Failed to create client, return code: %s\n", MQTTClient_strerror(rc));
183  exit(EXIT_FAILURE);
184  }
185 
186 #if defined(_WIN32)
187  signal(SIGINT, cfinish);
188  signal(SIGTERM, cfinish);
189 #else
190  memset(&sa, 0, sizeof(struct sigaction));
191  sa.sa_handler = cfinish;
192  sa.sa_flags = 0;
193 
194  sigaction(SIGINT, &sa, NULL);
195  sigaction(SIGTERM, &sa, NULL);
196 #endif
197 
198  rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
199  if (rc != MQTTCLIENT_SUCCESS)
200  {
201  if (!opts.quiet)
202  fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTClient_strerror(rc));
203  exit(EXIT_FAILURE);
204  }
205 
206  if (myconnect(client) != MQTTCLIENT_SUCCESS)
207  goto exit;
208 
209  if (opts.MQTTVersion >= MQTTVERSION_5)
210  {
212 
213  if (opts.message_expiry > 0)
214  {
216  property.value.integer4 = opts.message_expiry;
217  MQTTProperties_add(&pub_props, &property);
218  }
219  if (opts.user_property.name)
220  {
221  property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
222  property.value.data.data = opts.user_property.name;
223  property.value.data.len = (int)strlen(opts.user_property.name);
224  property.value.value.data = opts.user_property.value;
225  property.value.value.len = (int)strlen(opts.user_property.value);
226  MQTTProperties_add(&pub_props, &property);
227  }
228  }
229 
230  while (!toStop)
231  {
232  int data_len = 0;
233  int delim_len = 0;
234 
235  if (opts.stdin_lines)
236  {
237  buffer = malloc(opts.maxdatalen);
238 
239  delim_len = (int)strlen(opts.delimiter);
240  do
241  {
242  int c = getchar();
243 
244  if (c < 0)
245  goto exit;
246  buffer[data_len++] = c;
247  if (data_len > delim_len)
248  {
249  if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
250  break;
251  }
252  } while (data_len < opts.maxdatalen);
253  }
254  else if (opts.message)
255  {
256  buffer = opts.message;
257  data_len = (int)strlen(opts.message);
258  }
259  else if (opts.filename)
260  {
261  buffer = readfile(&data_len, &opts);
262  if (buffer == NULL)
263  goto exit;
264  }
265  if (opts.verbose)
266  fprintf(stderr, "Publishing data of length %d\n", data_len);
267 
268  if (opts.MQTTVersion == MQTTVERSION_5)
269  {
271 
272  response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, NULL);
273  rc = response.reasonCode;
274  }
275  else
276  rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, NULL);
277  if (opts.stdin_lines == 0)
278  break;
279 
280  if (rc != 0)
281  {
282  myconnect(client);
283  if (opts.MQTTVersion == MQTTVERSION_5)
284  {
286 
287  response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, NULL);
288  rc = response.reasonCode;
289  }
290  else
291  rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, NULL);
292  }
293  if (opts.qos > 0)
295  }
296 
297 exit:
298  if (opts.filename || opts.stdin_lines)
299  free(buffer);
300 
301  if (opts.MQTTVersion == MQTTVERSION_5)
302  rc = MQTTClient_disconnect5(client, 0, MQTTREASONCODE_SUCCESS, NULL);
303  else
304  rc = MQTTClient_disconnect(client, 0);
305 
306  MQTTClient_destroy(&client);
307 
308  return EXIT_SUCCESS;
309 }
int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTClient.c:1919
char * topic
Definition: pubsub_opts.h:41
char * capath
Definition: pubsub_opts.h:58
char * message
Definition: pubsub_opts.h:34
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char *message)
Definition: paho_cs_pub.c:132
MQTTResponse MQTTClient_publish5(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTProperties *properties, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2247
char * connection
Definition: pubsub_opts.h:49
void cfinish(int sig)
Definition: paho_cs_pub.c:36
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
MQTTProperties props
Definition: paho_c_pub.c:54
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions *options)
Definition: MQTTClient.c:1644
char * cert
Definition: pubsub_opts.h:59
int MQTTClient_publish(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2387
char * will_topic
Definition: pubsub_opts.h:52
int MQTTClient_disconnect(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1908
char * readfile(int *data_len, struct pubsub_opts *opts)
Definition: pubsub_opts.c:430
char * delimiter
Definition: pubsub_opts.h:31
char * filename
Definition: pubsub_opts.h:35
volatile int toStop
Definition: paho_cs_pub.c:33
struct pubsub_opts opts
Definition: paho_cs_pub.c:43
const char * enabledCipherSuites
Definition: MQTTClient.h:696
char * name
Definition: pubsub_opts.h:69
void usage(struct pubsub_opts *opts, pubsub_opts_nameValue *name_values, const char *program_name)
Definition: pubsub_opts.c:40
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *m)
Definition: paho_cs_pub.c:125
int MQTTProperties_add(MQTTProperties *props, const MQTTProperty *prop)
char * key
Definition: pubsub_opts.h:61
#define malloc(x)
Definition: Heap.h:41
#define MQTTClient_connectOptions_initializer5
Definition: MQTTClient.h:956
int MQTTClient_createWithOptions(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTClient_createOptions *options)
Definition: MQTTClient.c:364
enum MQTTReasonCodes reasonCode
Definition: MQTTClient.h:991
MQTTCLIENT_TRACE_LEVELS
Definition: MQTTClient.h:1377
MQTTAsync_connectOptions conn_opts
Definition: paho_c_sub.c:191
#define free(x)
Definition: Heap.h:55
#define MQTTClient_willOptions_initializer
Definition: MQTTClient.h:639
void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
Definition: MQTTClient.c:2855
const char * privateKey
Definition: MQTTClient.h:683
char * clientid
Definition: pubsub_opts.h:42
char * host
Definition: pubsub_opts.h:47
int MQTTClient_setCallbacks(MQTTClient handle, void *context, MQTTClient_connectionLost *cl, MQTTClient_messageArrived *ma, MQTTClient_deliveryComplete *dc)
Definition: MQTTClient.c:1032
MQTTClient_SSLOptions * ssl
Definition: MQTTClient.h:895
void MQTTClient_setTraceCallback(MQTTClient_traceCallback *callback)
Definition: MQTTClient.c:2861
char * cafile
Definition: pubsub_opts.h:60
const char * MQTTClient_strerror(int code)
Definition: MQTTClient.c:2903
char * ciphers
Definition: pubsub_opts.h:63
This structure represents a persistent data store, used to store outbound and inbound messages...
MQTTClient_nameValue * MQTTClient_getVersionInfo(void)
Definition: MQTTClient.c:2867
int message_expiry
Definition: pubsub_opts.h:67
enum MQTTPropertyCodes identifier
const char * topicName
Definition: MQTTClient.h:619
const char * keyStore
Definition: MQTTClient.h:678
int main(int argc, char **argv)
Definition: paho_cs_pub.c:138
void * MQTTClient
Definition: MQTTClient.h:246
void MQTTClient_yield(void)
Definition: MQTTClient.c:2730
int MQTTVersion
Definition: pubsub_opts.h:40
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions *options, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1658
version
Definition: setup.py:18
MQTTAsync client
Definition: test6.c:276
int will_retain
Definition: pubsub_opts.h:55
const char * trustStore
Definition: MQTTClient.h:673
char * keypass
Definition: pubsub_opts.h:62
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
MQTTClient c
Definition: test10.c:1656
dictionary context
Definition: test2.py:57
#define MQTTCLIENT_PERSISTENCE_NONE
#define MQTTResponse_initializer
Definition: MQTTClient.h:997
char * value
Definition: pubsub_opts.h:70
#define MQTTClient_connectOptions_initializer
Definition: MQTTClient.h:953
#define MQTTClient_createOptions_initializer
Definition: MQTTClient.h:549
int stdin_lines
Definition: pubsub_opts.h:36
char * password
Definition: pubsub_opts.h:46
#define MQTTClient_SSLOptions_initializer
Definition: MQTTClient.h:769
const char * privateKeyPassword
Definition: MQTTClient.h:686
char * will_payload
Definition: pubsub_opts.h:53
int getopts(int argc, char **argv, struct pubsub_opts *opts)
Definition: pubsub_opts.c:128
const char * CApath
Definition: MQTTClient.h:719
int tracelevel
Definition: pubsub_opts.h:30
const char * message
Definition: MQTTClient.h:621
MQTTProperty property
Definition: paho_c_pub.c:53
enum MQTTReasonCodes rc
Definition: test10.c:1112
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
MQTTClient_willOptions * will
Definition: MQTTClient.h:866
char * port
Definition: pubsub_opts.h:48
int maxdatalen
Definition: pubsub_opts.h:32
int myconnect(MQTTClient *client)
Definition: paho_cs_pub.c:54
struct pubsub_opts::@69 user_property
#define MQTTProperties_initializer
char * username
Definition: pubsub_opts.h:45


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:10