MQTTPacketOut.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs, Allan Stockdill-Mander - SSL updates
16  * Ian Craggs - MQTT 3.1.1 support
17  * Rong Xiang, Ian Craggs - C++ compatibility
18  * Ian Craggs - binary password and will payload
19  * Ian Craggs - MQTT 5.0 support
20  *******************************************************************************/
21 
30 #include "MQTTPacketOut.h"
31 #include "Log.h"
32 #include "StackTrace.h"
33 
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "Heap.h"
38 
39 
48 int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
49  MQTTProperties* connectProperties, MQTTProperties* willProperties)
50 {
51  char *buf, *ptr;
52  Connect packet;
53  int rc = SOCKET_ERROR, len;
54 
55  FUNC_ENTRY;
56  packet.header.byte = 0;
57  packet.header.bits.type = CONNECT;
58 
59  len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
60  if (client->will)
61  len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
62  if (client->username)
63  len += (int)strlen(client->username)+2;
64  if (client->password)
65  len += client->passwordlen+2;
66  if (MQTTVersion >= MQTTVERSION_5)
67  {
68  len += MQTTProperties_len(connectProperties);
69  if (client->will)
70  len += MQTTProperties_len(willProperties);
71  }
72 
73  ptr = buf = malloc(len);
74  if (ptr == NULL)
75  goto exit_nofree;
76  if (MQTTVersion == MQTTVERSION_3_1)
77  {
78  writeUTF(&ptr, "MQIsdp");
79  writeChar(&ptr, (char)MQTTVERSION_3_1);
80  }
81  else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
82  {
83  writeUTF(&ptr, "MQTT");
84  writeChar(&ptr, (char)MQTTVersion);
85  }
86  else
87  goto exit;
88 
89  packet.flags.all = 0;
90  if (MQTTVersion >= MQTTVERSION_5)
91  packet.flags.bits.cleanstart = client->cleanstart;
92  else
93  packet.flags.bits.cleanstart = client->cleansession;
94  packet.flags.bits.will = (client->will) ? 1 : 0;
95  if (packet.flags.bits.will)
96  {
97  packet.flags.bits.willQoS = client->will->qos;
98  packet.flags.bits.willRetain = client->will->retained;
99  }
100  if (client->username)
101  packet.flags.bits.username = 1;
102  if (client->password)
103  packet.flags.bits.password = 1;
104 
105  writeChar(&ptr, packet.flags.all);
106  writeInt(&ptr, client->keepAliveInterval);
107  if (MQTTVersion >= MQTTVERSION_5)
108  MQTTProperties_write(&ptr, connectProperties);
109  writeUTF(&ptr, client->clientID);
110  if (client->will)
111  {
112  if (MQTTVersion >= MQTTVERSION_5)
113  MQTTProperties_write(&ptr, willProperties);
114  writeUTF(&ptr, client->will->topic);
115  writeData(&ptr, client->will->payload, client->will->payloadlen);
116  }
117  if (client->username)
118  writeUTF(&ptr, client->username);
119  if (client->password)
120  writeData(&ptr, client->password, client->passwordlen);
121 
122  rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
123  Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
124  MQTTVersion, client->cleansession, rc);
125 exit:
126  if (rc != TCPSOCKET_INTERRUPTED)
127  free(buf);
128 exit_nofree:
129  FUNC_EXIT_RC(rc);
130  return rc;
131 }
132 
133 
142 void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
143 {
144  Connack* pack = NULL;
145  char* curdata = data;
146  char* enddata = &data[datalen];
147 
148  FUNC_ENTRY;
149  if ((pack = malloc(sizeof(Connack))) == NULL)
150  goto exit;
151  pack->MQTTVersion = MQTTVersion;
152  pack->header.byte = aHeader;
153  pack->flags.all = readChar(&curdata); /* connect flags */
154  pack->rc = readChar(&curdata); /* reason code */
155  if (MQTTVersion < MQTTVERSION_5)
156  {
157  if (datalen != 2)
158  {
159  free(pack);
160  pack = NULL;
161  }
162  }
163  else if (datalen > 2)
164  {
166  pack->properties = props;
167  if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
168  {
169  if (pack->properties.array)
170  free(pack->properties.array);
171  if (pack)
172  free(pack);
173  pack = NULL; /* signal protocol error */
174  goto exit;
175  }
176  }
177 exit:
178  FUNC_EXIT;
179  return pack;
180 }
181 
182 
188 {
189  FUNC_ENTRY;
190  if (pack->MQTTVersion >= MQTTVERSION_5)
192  free(pack);
193  FUNC_EXIT;
194 }
195 
196 
203 int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
204 {
205  Header header;
206  int rc = 0;
207 
208  FUNC_ENTRY;
209  header.byte = 0;
210  header.bits.type = PINGREQ;
211  rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
212  Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
213  FUNC_EXIT_RC(rc);
214  return rc;
215 }
216 
217 
229  int msgid, int dup, Clients* client)
230 {
231  Header header;
232  char *data, *ptr;
233  int rc = -1;
234  ListElement *elem = NULL, *qosElem = NULL;
235  int datalen, i = 0;
236 
237  FUNC_ENTRY;
238  header.bits.type = SUBSCRIBE;
239  header.bits.dup = dup;
240  header.bits.qos = 1;
241  header.bits.retain = 0;
242 
243  datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
244  while (ListNextElement(topics, &elem))
245  datalen += (int)strlen((char*)(elem->content));
246  if (client->MQTTVersion >= MQTTVERSION_5)
247  datalen += MQTTProperties_len(props);
248 
249  ptr = data = malloc(datalen);
250  if (ptr == NULL)
251  goto exit;
252  writeInt(&ptr, msgid);
253 
254  if (client->MQTTVersion >= MQTTVERSION_5)
255  MQTTProperties_write(&ptr, props);
256 
257  elem = NULL;
258  while (ListNextElement(topics, &elem))
259  {
260  char subopts = 0;
261 
262  ListNextElement(qoss, &qosElem);
263  writeUTF(&ptr, (char*)(elem->content));
264  subopts = *(int*)(qosElem->content);
265  if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
266  {
267  subopts |= (opts[i].noLocal << 2); /* 1 bit */
268  subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
269  subopts |= (opts[i].retainHandling << 4); /* 2 bits */
270  }
271  writeChar(&ptr, subopts);
272  ++i;
273  }
274  rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
275  Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
276  if (rc != TCPSOCKET_INTERRUPTED)
277  free(data);
278 exit:
279  FUNC_EXIT_RC(rc);
280  return rc;
281 }
282 
283 
292 void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
293 {
294  Suback* pack = NULL;
295  char* curdata = data;
296  char* enddata = &data[datalen];
297 
298  FUNC_ENTRY;
299  if ((pack = malloc(sizeof(Suback))) == NULL)
300  goto exit;
301  pack->MQTTVersion = MQTTVersion;
302  pack->header.byte = aHeader;
303  pack->msgId = readInt(&curdata);
304  if (MQTTVersion >= MQTTVERSION_5)
305  {
307  pack->properties = props;
308  if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
309  {
310  if (pack->properties.array)
311  free(pack->properties.array);
312  if (pack)
313  free(pack);
314  pack = NULL; /* signal protocol error */
315  goto exit;
316  }
317  }
318  pack->qoss = ListInitialize();
319  while ((size_t)(curdata - data) < datalen)
320  {
321  unsigned int* newint;
322  newint = malloc(sizeof(unsigned int));
323  if (newint == NULL)
324  {
325  if (pack->properties.array)
326  free(pack->properties.array);
327  if (pack)
328  free(pack);
329  pack = NULL; /* signal protocol error */
330  goto exit;
331  }
332  *newint = (unsigned int)readChar(&curdata);
333  ListAppend(pack->qoss, newint, sizeof(unsigned int));
334  }
335  if (pack->qoss->count == 0)
336  {
337  if (pack->properties.array)
338  free(pack->properties.array);
339  if (pack)
340  free(pack);
341  ListFree(pack->qoss);
342  pack = NULL;
343  }
344 exit:
345  FUNC_EXIT;
346  return pack;
347 }
348 
349 
360 {
361  Header header;
362  char *data, *ptr;
363  int rc = SOCKET_ERROR;
364  ListElement *elem = NULL;
365  int datalen;
366 
367  FUNC_ENTRY;
368  header.bits.type = UNSUBSCRIBE;
369  header.bits.dup = dup;
370  header.bits.qos = 1;
371  header.bits.retain = 0;
372 
373  datalen = 2 + topics->count * 2; /* utf length == 2 */
374  while (ListNextElement(topics, &elem))
375  datalen += (int)strlen((char*)(elem->content));
376  if (client->MQTTVersion >= MQTTVERSION_5)
377  datalen += MQTTProperties_len(props);
378  ptr = data = malloc(datalen);
379  if (ptr == NULL)
380  goto exit;
381 
382  writeInt(&ptr, msgid);
383 
384  if (client->MQTTVersion >= MQTTVERSION_5)
385  MQTTProperties_write(&ptr, props);
386 
387  elem = NULL;
388  while (ListNextElement(topics, &elem))
389  writeUTF(&ptr, (char*)(elem->content));
390  rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
391  Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
392  if (rc != TCPSOCKET_INTERRUPTED)
393  free(data);
394 exit:
395  FUNC_EXIT_RC(rc);
396  return rc;
397 }
398 
399 
408 void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
409 {
410  Unsuback* pack = NULL;
411  char* curdata = data;
412  char* enddata = &data[datalen];
413 
414  FUNC_ENTRY;
415  if ((pack = malloc(sizeof(Unsuback))) == NULL)
416  goto exit;
417  pack->MQTTVersion = MQTTVersion;
418  pack->header.byte = aHeader;
419  pack->msgId = readInt(&curdata);
420  pack->reasonCodes = NULL;
421  if (MQTTVersion >= MQTTVERSION_5)
422  {
424  pack->properties = props;
425  if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
426  {
427  if (pack->properties.array)
428  free(pack->properties.array);
429  if (pack)
430  free(pack);
431  pack = NULL; /* signal protocol error */
432  goto exit;
433  }
434  pack->reasonCodes = ListInitialize();
435  while ((size_t)(curdata - data) < datalen)
436  {
437  enum MQTTReasonCodes* newrc;
438  newrc = malloc(sizeof(enum MQTTReasonCodes));
439  if (newrc == NULL)
440  {
441  if (pack->properties.array)
442  free(pack->properties.array);
443  if (pack)
444  free(pack);
445  pack = NULL; /* signal protocol error */
446  goto exit;
447  }
448  *newrc = (enum MQTTReasonCodes)readChar(&curdata);
449  ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
450  }
451  if (pack->reasonCodes->count == 0)
452  {
453  ListFree(pack->reasonCodes);
454  if (pack->properties.array)
455  free(pack->properties.array);
456  if (pack)
457  free(pack);
458  pack = NULL;
459  }
460  }
461 exit:
462  FUNC_EXIT;
463  return pack;
464 }
MQTTProperties properties
Definition: MQTTPacket.h:176
Header header
Definition: MQTTPacket.h:135
#define LOG_PROTOCOL
Definition: Log.h:64
union Connack::@62 flags
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
MQTTProperties properties
Definition: MQTTPacket.h:155
int msgId
Definition: MQTTPacket.h:174
void * MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
int MQTTProperties_len(MQTTProperties *props)
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
MQTTReasonCodes
#define FUNC_EXIT
Definition: StackTrace.h:59
MQTTProperties props
Definition: paho_c_pub.c:54
unsigned int MQTTVersion
Definition: MQTTPacket.h:154
int readInt(char **pptr)
Definition: MQTTPacket.c:359
int MQTTVersion
Definition: Clients.h:146
bool retain
Definition: MQTTPacket.h:77
void MQTTProperties_free(MQTTProperties *props)
char byte
Definition: MQTTPacket.h:65
void MQTTPacket_freeConnack(Connack *pack)
unsigned char readChar(char **pptr)
Definition: MQTTPacket.c:425
struct Header::@59 bits
int MQTTPacket_send_connect(Clients *client, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTPacketOut.c:48
int MQTTVersion
Definition: MQTTPacket.h:175
struct pubsub_opts opts
Definition: paho_c_pub.c:42
#define malloc(x)
Definition: Heap.h:41
struct Connect::@60::@61 bits
void writeInt(char **pptr, int anInt)
Definition: MQTTPacket.c:450
int msgId
Definition: MQTTPacket.h:187
unsigned char retainAsPublished
unsigned char all
Definition: MQTTPacket.h:94
MQTTProperties properties
Definition: MQTTPacket.h:189
void writeChar(char **pptr, char c)
Definition: MQTTPacket.c:438
#define free(x)
Definition: Heap.h:55
int passwordlen
Definition: Clients.h:121
unsigned int type
Definition: MQTTPacket.h:80
networkHandles net
Definition: Clients.h:129
void * MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
int count
Definition: LinkedList.h:72
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
List * reasonCodes
Definition: MQTTPacket.h:190
int keepAliveInterval
Definition: Clients.h:131
bool dup
Definition: MQTTPacket.h:79
int MQTTPacket_send_subscribe(List *topics, List *qoss, MQTTSubscribe_options *opts, MQTTProperties *props, int msgid, int dup, Clients *client)
int payloadlen
Definition: Clients.h:72
void writeData(char **pptr, const void *data, int datalen)
Definition: MQTTPacket.c:479
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
void * payload
Definition: Clients.h:73
Header header
Definition: MQTTPacket.h:91
int MQTTPacket_send_unsubscribe(List *topics, MQTTProperties *props, int msgid, int dup, Clients *client)
int retained
Definition: Clients.h:74
void writeUTF(char **pptr, const char *string)
Definition: MQTTPacket.c:464
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
Header header
Definition: MQTTPacket.h:173
Header header
Definition: MQTTPacket.h:186
void * MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
List * qoss
Definition: MQTTPacket.h:177
unsigned char all
Definition: MQTTPacket.h:138
MQTTAsync client
Definition: test6.c:276
MQTTProperty * array
#define FUNC_ENTRY
Definition: StackTrace.h:55
unsigned char rc
Definition: MQTTPacket.h:153
unsigned int cleanstart
Definition: Clients.h:124
int MQTTVersion
Definition: MQTTPacket.h:188
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
union Connect::@60 flags
const void * ptr(const T *p)
Definition: format.h:3610
const void * password
Definition: Clients.h:122
char * topic
Definition: Clients.h:71
int MQTTPacket_send_pingreq(networkHandles *net, const char *clientID)
int MQTTPacket_send(networkHandles *net, Header header, char *buffer, size_t buflen, int freeData, int MQTTVersion)
Definition: MQTTPacket.c:190
List * ListInitialize(void)
Definition: LinkedList.c:52
char * clientID
Definition: Clients.h:119
unsigned char retainHandling
dictionary data
Definition: mqtt_test.py:22
char * topics[]
#define MQTTVERSION_3_1
Definition: MQTTAsync.h:199
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
const char * username
Definition: Clients.h:120
unsigned int qos
Definition: MQTTPacket.h:78
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
unsigned int cleansession
Definition: Clients.h:123
int len
Definition: utf-8.c:46
willMessages * will
Definition: Clients.h:134
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09