paho_cs_sub.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2012, 2018 IBM Corp., and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial contribution
15  * Ian Craggs - change delimiter option from char to string
16  * Guilherme Maciel Ferreira - add keep alive option
17  * Ian Craggs - add full capability
18  *******************************************************************************/
19 
20 #include "MQTTClient.h"
21 #include "MQTTClientPersistence.h"
22 #include "pubsub_opts.h"
23 
24 #include <stdio.h>
25 #include <signal.h>
26 #include <string.h>
27 #include <stdlib.h>
28 
29 
30 #if defined(_WIN32)
31 #define sleep Sleep
32 #else
33 #include <sys/time.h>
34 #endif
35 
36 
37 volatile int toStop = 0;
38 
39 
40 struct pubsub_opts opts =
41 {
42  0, 0, 0, 0, "\n", 100, /* debug/app options */
43  NULL, NULL, 1, 0, 0, /* message options */
44  MQTTVERSION_DEFAULT, NULL, "paho-cs-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
45  NULL, NULL, 0, 0, /* will options */
46  0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
47  0, {NULL, NULL}, /* MQTT V5 options */
48 };
49 
50 
52 {
56  int rc = 0;
57 
58  if (opts.verbose)
59  printf("Connecting\n");
60 
61  if (opts.MQTTVersion == MQTTVERSION_5)
62  {
64  conn_opts = conn_opts5;
65  }
66 
67  conn_opts.keepAliveInterval = opts.keepalive;
68  conn_opts.username = opts.username;
69  conn_opts.password = opts.password;
70  conn_opts.MQTTVersion = opts.MQTTVersion;
71 
72  if (opts.will_topic) /* will options */
73  {
74  will_opts.message = opts.will_payload;
75  will_opts.topicName = opts.will_topic;
76  will_opts.qos = opts.will_qos;
77  will_opts.retained = opts.will_retain;
78  conn_opts.will = &will_opts;
79  }
80 
81  if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
82  strncmp(opts.connection, "wss://", 6) == 0))
83  {
84  if (opts.insecure)
85  ssl_opts.verify = 0;
86  else
87  ssl_opts.verify = 1;
88  ssl_opts.CApath = opts.capath;
89  ssl_opts.keyStore = opts.cert;
90  ssl_opts.trustStore = opts.cafile;
91  ssl_opts.privateKey = opts.key;
92  ssl_opts.privateKeyPassword = opts.keypass;
93  ssl_opts.enabledCipherSuites = opts.ciphers;
94  conn_opts.ssl = &ssl_opts;
95  }
96 
97  if (opts.MQTTVersion == MQTTVERSION_5)
98  {
102 
103  conn_opts.cleanstart = 1;
104  response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
105  rc = response.reasonCode;
106  MQTTResponse_free(response);
107  }
108  else
109  {
110  conn_opts.cleansession = 1;
111  rc = MQTTClient_connect(client, &conn_opts);
112  }
113 
114  if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
115  fprintf(stderr, "Connected\n");
116  else if (rc != MQTTCLIENT_SUCCESS && !opts.quiet)
117  fprintf(stderr, "Connect failed return code: %s\n", MQTTClient_strerror(rc));
118 
119  return rc;
120 }
121 
122 
123 void cfinish(int sig)
124 {
125  signal(SIGINT, NULL);
126  toStop = 1;
127 }
128 
129 
131 {
132  fprintf(stderr, "Trace : %d, %s\n", level, message);
133 }
134 
135 
136 int main(int argc, char** argv)
137 {
141  int rc = 0;
142  char* url;
143  const char* version = NULL;
144 #if !defined(_WIN32)
145  struct sigaction sa;
146 #endif
147  const char* program_name = "paho_cs_sub";
149 
150  if (argc < 2)
151  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
152 
153  if (getopts(argc, argv, &opts) != 0)
154  usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
155 
156  if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
157  opts.verbose = 1;
158 
159  if (opts.connection)
160  url = opts.connection;
161  else
162  {
163  url = malloc(100);
164  sprintf(url, "%s:%s", opts.host, opts.port);
165  }
166  if (opts.verbose)
167  printf("URL is %s\n", url);
168 
169  if (opts.tracelevel > 0)
170  {
173  }
174 
175  if (opts.MQTTVersion >= MQTTVERSION_5)
176  createOpts.MQTTVersion = MQTTVERSION_5;
178  NULL, &createOpts);
179  if (rc != MQTTCLIENT_SUCCESS)
180  {
181  if (!opts.quiet)
182  fprintf(stderr, "Failed to create client, return code: %s\n", MQTTClient_strerror(rc));
183  exit(EXIT_FAILURE);
184  }
185 
186 #if defined(_WIN32)
187  signal(SIGINT, cfinish);
188  signal(SIGTERM, cfinish);
189 #else
190  memset(&sa, 0, sizeof(struct sigaction));
191  sa.sa_handler = cfinish;
192  sa.sa_flags = 0;
193 
194  sigaction(SIGINT, &sa, NULL);
195  sigaction(SIGTERM, &sa, NULL);
196 #endif
197 
198  if (myconnect(client) != MQTTCLIENT_SUCCESS)
199  goto exit;
200 
201  if (opts.MQTTVersion >= MQTTVERSION_5)
202  {
203  MQTTResponse response = MQTTClient_subscribe5(client, opts.topic, opts.qos, NULL, NULL);
204  rc = response.reasonCode;
205  MQTTResponse_free(response);
206  }
207  else
208  rc = MQTTClient_subscribe(client, opts.topic, opts.qos);
209  if (rc != MQTTCLIENT_SUCCESS && rc != opts.qos)
210  {
211  if (!opts.quiet)
212  fprintf(stderr, "Error %d subscribing to topic %s\n", rc, opts.topic);
213  goto exit;
214  }
215 
216  while (!toStop)
217  {
218  char* topicName = NULL;
219  int topicLen;
220  MQTTClient_message* message = NULL;
221 
222  rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
223  if (message)
224  {
225  size_t delimlen = 0;
226 
227  if (opts.verbose)
228  printf("%s\t", topicName);
229  if (opts.delimiter)
230  delimlen = strlen(opts.delimiter);
231  if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
232  strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
233  printf("%.*s", message->payloadlen, (char*)message->payload);
234  else
235  printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
236  if (message->struct_version == 1 && opts.verbose)
237  logProperties(&message->properties);
238  fflush(stdout);
239  MQTTClient_freeMessage(&message);
240  MQTTClient_free(topicName);
241  }
242  if (rc != 0)
243  myconnect(&client);
244  }
245 
246 exit:
247  MQTTClient_disconnect(client, 0);
248 
249  MQTTClient_destroy(&client);
250 
251  return EXIT_SUCCESS;
252 }
char * topic
Definition: pubsub_opts.h:41
char * capath
Definition: pubsub_opts.h:58
int MQTTClient_receive(MQTTClient handle, char **topicName, int *topicLen, MQTTClient_message **message, unsigned long timeout)
Definition: MQTTClient.c:2674
volatile int toStop
Definition: paho_cs_sub.c:37
char * message
Definition: pubsub_opts.h:34
char * connection
Definition: pubsub_opts.h:49
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
MQTTProperties props
Definition: paho_c_pub.c:54
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions *options)
Definition: MQTTClient.c:1644
char * cert
Definition: pubsub_opts.h:59
char * will_topic
Definition: pubsub_opts.h:52
int MQTTClient_disconnect(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1908
char * delimiter
Definition: pubsub_opts.h:31
const char * enabledCipherSuites
Definition: MQTTClient.h:696
void usage(struct pubsub_opts *opts, pubsub_opts_nameValue *name_values, const char *program_name)
Definition: pubsub_opts.c:40
char * key
Definition: pubsub_opts.h:61
#define malloc(x)
Definition: Heap.h:41
#define MQTTClient_connectOptions_initializer5
Definition: MQTTClient.h:956
int MQTTClient_createWithOptions(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTClient_createOptions *options)
Definition: MQTTClient.c:364
enum MQTTReasonCodes reasonCode
Definition: MQTTClient.h:991
void MQTTResponse_free(MQTTResponse response)
Definition: MQTTClient.c:620
MQTTCLIENT_TRACE_LEVELS
Definition: MQTTClient.h:1377
MQTTAsync_connectOptions conn_opts
Definition: paho_c_sub.c:191
#define MQTTClient_willOptions_initializer
Definition: MQTTClient.h:639
void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
Definition: MQTTClient.c:2855
const char * privateKey
Definition: MQTTClient.h:683
char * clientid
Definition: pubsub_opts.h:42
char * host
Definition: pubsub_opts.h:47
MQTTClient_SSLOptions * ssl
Definition: MQTTClient.h:895
void MQTTClient_setTraceCallback(MQTTClient_traceCallback *callback)
Definition: MQTTClient.c:2861
char * cafile
Definition: pubsub_opts.h:60
const char * MQTTClient_strerror(int code)
Definition: MQTTClient.c:2903
char * ciphers
Definition: pubsub_opts.h:63
This structure represents a persistent data store, used to store outbound and inbound messages...
MQTTClient_nameValue * MQTTClient_getVersionInfo(void)
Definition: MQTTClient.c:2867
const char * topicName
Definition: MQTTClient.h:619
void MQTTClient_freeMessage(MQTTClient_message **message)
Definition: MQTTClient.c:601
const char * keyStore
Definition: MQTTClient.h:678
void * MQTTClient
Definition: MQTTClient.h:246
int MQTTVersion
Definition: pubsub_opts.h:40
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions *options, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1658
MQTTProperties properties
Definition: MQTTClient.h:324
version
Definition: setup.py:18
MQTTAsync client
Definition: test6.c:276
int will_retain
Definition: pubsub_opts.h:55
const char * trustStore
Definition: MQTTClient.h:673
char * keypass
Definition: pubsub_opts.h:62
int main(int argc, char **argv)
Definition: paho_cs_sub.c:136
int MQTTClient_subscribe(MQTTClient handle, const char *topic, int qos)
Definition: MQTTClient.c:2104
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
void logProperties(MQTTProperties *props)
Definition: pubsub_opts.c:466
MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char *topic, int qos, MQTTSubscribe_options *opts, MQTTProperties *props)
Definition: MQTTClient.c:2090
void MQTTClient_free(void *memory)
Definition: MQTTClient.c:612
#define MQTTCLIENT_PERSISTENCE_NONE
void cfinish(int sig)
Definition: paho_cs_sub.c:123
#define MQTTResponse_initializer
Definition: MQTTClient.h:997
#define MQTTClient_connectOptions_initializer
Definition: MQTTClient.h:953
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char *message)
Definition: paho_cs_sub.c:130
#define MQTTClient_createOptions_initializer
Definition: MQTTClient.h:549
char * password
Definition: pubsub_opts.h:46
#define MQTTClient_SSLOptions_initializer
Definition: MQTTClient.h:769
const char * privateKeyPassword
Definition: MQTTClient.h:686
char * will_payload
Definition: pubsub_opts.h:53
int getopts(int argc, char **argv, struct pubsub_opts *opts)
Definition: pubsub_opts.c:128
int myconnect(MQTTClient *client)
Definition: paho_cs_sub.c:51
const char * CApath
Definition: MQTTClient.h:719
int tracelevel
Definition: pubsub_opts.h:30
struct pubsub_opts opts
Definition: paho_cs_sub.c:40
const char * message
Definition: MQTTClient.h:621
enum MQTTReasonCodes rc
Definition: test10.c:1112
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
MQTTClient_willOptions * will
Definition: MQTTClient.h:866
char * port
Definition: pubsub_opts.h:48
#define MQTTProperties_initializer
char * username
Definition: pubsub_opts.h:45


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:10