MQTTPacket.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs, Allan Stockdill-Mander - SSL updates
16  * Ian Craggs - MQTT 3.1.1 support
17  * Ian Craggs - fix for issue 453
18  * Ian Craggs - MQTT 5.0 support
19  *******************************************************************************/
20 
28 #include "MQTTPacket.h"
29 #include "Log.h"
30 #if !defined(NO_PERSISTENCE)
31  #include "MQTTPersistence.h"
32 #endif
33 #include "Messages.h"
34 #include "StackTrace.h"
35 #include "WebSocket.h"
36 #include "MQTTTime.h"
37 
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "Heap.h"
42 
43 #if !defined(min)
44 #define min(A,B) ( (A) < (B) ? (A):(B))
45 #endif
46 
50 static const char *packet_names[] =
51 {
52  "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
53  "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
54  "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
55 };
56 
58 
59 
65 const char* MQTTPacket_name(int ptype)
66 {
67  return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
68 }
69 
74 {
75  NULL,
76  NULL,
83  NULL,
85  NULL,
90  MQTTPacket_ack
91 };
92 
93 
94 static char* readUTFlen(char** pptr, char* enddata, int* len);
95 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net);
96 
103 void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
104 {
105  char* data = NULL;
106  static Header header;
107  size_t remaining_length;
108  int ptype;
109  void* pack = NULL;
110  size_t actual_len = 0;
111 
112  FUNC_ENTRY;
113  *error = SOCKET_ERROR; /* indicate whether an error occurred, or not */
114 
115  const size_t headerWsFramePos = WebSocket_framePos();
116 
117  /* read the packet data from the socket */
118  *error = WebSocket_getch(net, &header.byte);
119  if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */
120  goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
121 
122  /* now read the remaining length, so we know how much more to read */
123  if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
124  goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
125 
126  /* now read the rest, the variable header and payload */
127  data = WebSocket_getdata(net, remaining_length, &actual_len);
128  if (remaining_length && data == NULL)
129  {
130  *error = SOCKET_ERROR;
131  goto exit; /* socket error */
132  }
133 
134  if (actual_len < remaining_length)
135  *error = TCPSOCKET_INTERRUPTED;
136  else
137  {
138  ptype = header.bits.type;
139  if (ptype < CONNECT || (MQTTVersion < MQTTVERSION_5 && ptype >= DISCONNECT) ||
140  (MQTTVersion >= MQTTVERSION_5 && ptype > AUTH) ||
141  new_packets[ptype] == NULL)
142  Log(TRACE_MIN, 2, NULL, ptype);
143  else
144  {
145  if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
146  {
147  *error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
148  Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
149  }
150 #if !defined(NO_PERSISTENCE)
151  else if (header.bits.type == PUBLISH && header.bits.qos == 2)
152  {
153  int buf0len;
154  char *buf = malloc(10);
155 
156  if (buf == NULL)
157  {
158  *error = SOCKET_ERROR;
159  goto exit;
160  }
161  buf[0] = header.byte;
162  buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
163  *error = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1,
164  &data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1, MQTTVersion);
165  free(buf);
166  }
167 #endif
168  }
169  }
170  if (pack)
171  net->lastReceived = MQTTTime_now();
172 exit:
173  if (*error == TCPSOCKET_INTERRUPTED)
174  WebSocket_framePosSeekTo(headerWsFramePos);
175 
176  FUNC_EXIT_RC(*error);
177  return pack;
178 }
179 
180 
190 int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData,
191  int MQTTVersion)
192 {
193  int rc = SOCKET_ERROR;
194  size_t buf0len;
195  char *buf;
196  PacketBuffers packetbufs;
197 
198  FUNC_ENTRY;
199  buf0len = 1 + MQTTPacket_encode(NULL, buflen);
200  buf = malloc(buf0len);
201  if (buf == NULL)
202  {
203  rc = SOCKET_ERROR;
204  goto exit;
205  }
206  buf[0] = header.byte;
207  MQTTPacket_encode(&buf[1], buflen);
208 
209 #if !defined(NO_PERSISTENCE)
210  if (header.bits.type == PUBREL)
211  {
212  char* ptraux = buffer;
213  int msgId = readInt(&ptraux);
214 
215  rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1, &buffer, &buflen,
216  header.bits.type, msgId, 0, MQTTVersion);
217  }
218 #endif
219  packetbufs.count = 1;
220  packetbufs.buffers = &buffer;
221  packetbufs.buflens = &buflen;
222  packetbufs.frees = &freeData;
223  memset(packetbufs.mask, '\0', sizeof(packetbufs.mask));
224  rc = WebSocket_putdatas(net, &buf, &buf0len, &packetbufs);
225 
226  if (rc == TCPSOCKET_COMPLETE)
227  net->lastSent = MQTTTime_now();
228 
229  if (rc != TCPSOCKET_INTERRUPTED)
230  free(buf);
231 
232 exit:
233  FUNC_EXIT_RC(rc);
234  return rc;
235 }
236 
237 
238 
239 
250 int MQTTPacket_sends(networkHandles* net, Header header, PacketBuffers* bufs, int MQTTVersion)
251 {
252  int i, rc = SOCKET_ERROR;
253  size_t buf0len, total = 0;
254  char *buf;
255 
256  FUNC_ENTRY;
257  for (i = 0; i < bufs->count; i++)
258  total += bufs->buflens[i];
259  buf0len = 1 + MQTTPacket_encode(NULL, total);
260  buf = malloc(buf0len);
261  if (buf == NULL)
262  {
263  rc = SOCKET_ERROR;
264  goto exit;
265  }
266  buf[0] = header.byte;
267  MQTTPacket_encode(&buf[1], total);
268 
269 #if !defined(NO_PERSISTENCE)
270  if (header.bits.type == PUBLISH && header.bits.qos != 0)
271  { /* persist PUBLISH QoS1 and Qo2 */
272  char *ptraux = bufs->buffers[2];
273  int msgId = readInt(&ptraux);
274  rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, bufs->count, bufs->buffers, bufs->buflens,
275  header.bits.type, msgId, 0, MQTTVersion);
276  }
277 #endif
278  rc = WebSocket_putdatas(net, &buf, &buf0len, bufs);
279 
280  if (rc == TCPSOCKET_COMPLETE)
281  net->lastSent = MQTTTime_now();
282 
283  if (rc != TCPSOCKET_INTERRUPTED)
284  free(buf);
285 exit:
286  FUNC_EXIT_RC(rc);
287  return rc;
288 }
289 
290 
297 int MQTTPacket_encode(char* buf, size_t length)
298 {
299  int rc = 0;
300 
301  FUNC_ENTRY;
302  do
303  {
304  char d = length % 128;
305  length /= 128;
306  /* if there are more digits to encode, set the top bit of this digit */
307  if (length > 0)
308  d |= 0x80;
309  if (buf)
310  buf[rc++] = d;
311  else
312  rc++;
313  } while (length > 0);
314  FUNC_EXIT_RC(rc);
315  return rc;
316 }
317 
318 
326 {
327  int rc = SOCKET_ERROR;
328  char c;
329  int multiplier = 1;
330  int len = 0;
331 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
332 
333  FUNC_ENTRY;
334  *value = 0;
335  do
336  {
338  {
339  rc = SOCKET_ERROR; /* bad data */
340  goto exit;
341  }
342  rc = WebSocket_getch(net, &c);
343  if (rc != TCPSOCKET_COMPLETE)
344  goto exit;
345  *value += (c & 127) * multiplier;
346  multiplier *= 128;
347  } while ((c & 128) != 0);
348 exit:
349  FUNC_EXIT_RC(rc);
350  return rc;
351 }
352 
353 
359 int readInt(char** pptr)
360 {
361  char* ptr = *pptr;
362  int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
363  *pptr += 2;
364  return len;
365 }
366 
367 
380 static char* readUTFlen(char** pptr, char* enddata, int* len)
381 {
382  char* string = NULL;
383 
384  FUNC_ENTRY;
385  if (enddata - (*pptr) > 1) /* enough length to read the integer? */
386  {
387  *len = readInt(pptr);
388  if (&(*pptr)[*len] <= enddata)
389  {
390  if ((string = malloc(*len+1)) == NULL)
391  goto exit;
392  memcpy(string, *pptr, *len);
393  string[*len] = '\0';
394  *pptr += *len;
395  }
396  }
397 exit:
398  FUNC_EXIT;
399  return string;
400 }
401 
402 
413 char* readUTF(char** pptr, char* enddata)
414 {
415  int len;
416  return readUTFlen(pptr, enddata, &len);
417 }
418 
419 
425 unsigned char readChar(char** pptr)
426 {
427  unsigned char c = **pptr;
428  (*pptr)++;
429  return c;
430 }
431 
432 
438 void writeChar(char** pptr, char c)
439 {
440  **pptr = c;
441  (*pptr)++;
442 }
443 
444 
450 void writeInt(char** pptr, int anInt)
451 {
452  **pptr = (char)(anInt / 256);
453  (*pptr)++;
454  **pptr = (char)(anInt % 256);
455  (*pptr)++;
456 }
457 
458 
464 void writeUTF(char** pptr, const char* string)
465 {
466  size_t len = strlen(string);
467  writeInt(pptr, (int)len);
468  memcpy(*pptr, string, len);
469  *pptr += len;
470 }
471 
472 
479 void writeData(char** pptr, const void* data, int datalen)
480 {
481  writeInt(pptr, datalen);
482  memcpy(*pptr, data, datalen);
483  *pptr += datalen;
484 }
485 
486 
495 void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
496 {
497  static unsigned char header = 0;
498  header = aHeader;
499  return &header;
500 }
501 
502 
509 {
510  Header header;
511  int rc = 0;
512 
513  FUNC_ENTRY;
514  header.byte = 0;
515  header.bits.type = DISCONNECT;
516 
517  if (client->MQTTVersion >= 5 && (props || reason != MQTTREASONCODE_SUCCESS))
518  {
519  size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
520  char *buf = NULL;
521  char *ptr = NULL;
522 
523  if ((buf = malloc(buflen)) == NULL)
524  {
525  rc = SOCKET_ERROR;
526  goto exit;
527  }
528  ptr = buf;
529  writeChar(&ptr, reason);
530  if (props)
531  MQTTProperties_write(&ptr, props);
532  if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1,
533  client->MQTTVersion)) != TCPSOCKET_INTERRUPTED)
534  free(buf);
535  }
536  else
537  rc = MQTTPacket_send(&client->net, header, NULL, 0, 0, client->MQTTVersion);
538 exit:
539  Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
540  FUNC_EXIT_RC(rc);
541  return rc;
542 }
543 
544 
553 void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
554 {
555  Publish* pack = NULL;
556  char* curdata = data;
557  char* enddata = &data[datalen];
558 
559  FUNC_ENTRY;
560  if ((pack = malloc(sizeof(Publish))) == NULL)
561  goto exit;
562  memset(pack, '\0', sizeof(Publish));
563  pack->MQTTVersion = MQTTVersion;
564  pack->header.byte = aHeader;
565  if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
566  {
567  free(pack);
568  pack = NULL;
569  goto exit;
570  }
571  if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */
572  pack->msgId = readInt(&curdata);
573  else
574  pack->msgId = 0;
575  if (MQTTVersion >= MQTTVERSION_5)
576  {
578  pack->properties = props;
579  if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
580  {
581  if (pack->properties.array)
582  free(pack->properties.array);
583  if (pack)
584  free(pack);
585  pack = NULL; /* signal protocol error */
586  goto exit;
587  }
588  }
589  pack->payload = curdata;
590  pack->payloadlen = (int)(datalen-(curdata-data));
591 exit:
592  FUNC_EXIT;
593  return pack;
594 }
595 
596 
602 {
603  FUNC_ENTRY;
604  if (pack->topic != NULL)
605  free(pack->topic);
606  if (pack->MQTTVersion >= MQTTVERSION_5)
608  free(pack);
609  FUNC_EXIT;
610 }
611 
612 
618 {
619  FUNC_ENTRY;
620  if (pack->MQTTVersion >= MQTTVERSION_5)
622  free(pack);
623  FUNC_EXIT;
624 }
625 
626 
636 static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
637 {
638  Header header;
639  int rc = SOCKET_ERROR;
640  char *buf = NULL;
641  char *ptr = NULL;
642 
643  FUNC_ENTRY;
644  if ((ptr = buf = malloc(2)) == NULL)
645  goto exit;
646  header.byte = 0;
647  header.bits.type = type;
648  header.bits.dup = dup;
649  if (type == PUBREL)
650  header.bits.qos = 1;
651  writeInt(&ptr, msgid);
652  if ((rc = MQTTPacket_send(net, header, buf, 2, 1, MQTTVersion)) != TCPSOCKET_INTERRUPTED)
653  free(buf);
654 exit:
655  FUNC_EXIT_RC(rc);
656  return rc;
657 }
658 
659 
668 int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
669 {
670  int rc = 0;
671 
672  FUNC_ENTRY;
673  rc = MQTTPacket_send_ack(MQTTVersion, PUBACK, msgid, 0, net);
674  Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
675  FUNC_EXIT_RC(rc);
676  return rc;
677 }
678 
679 
685 {
686  FUNC_ENTRY;
687  if (pack->MQTTVersion >= MQTTVERSION_5)
689  if (pack->qoss != NULL)
690  ListFree(pack->qoss);
691  free(pack);
692  FUNC_EXIT;
693 }
694 
695 
701 {
702  FUNC_ENTRY;
703  if (pack->MQTTVersion >= MQTTVERSION_5)
704  {
706  if (pack->reasonCodes != NULL)
707  ListFree(pack->reasonCodes);
708  }
709  free(pack);
710  FUNC_EXIT;
711 }
712 
713 
722 int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
723 {
724  int rc = 0;
725 
726  FUNC_ENTRY;
727  rc = MQTTPacket_send_ack(MQTTVersion, PUBREC, msgid, 0, net);
728  Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
729  FUNC_EXIT_RC(rc);
730  return rc;
731 }
732 
733 
743 int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles* net, const char* clientID)
744 {
745  int rc = 0;
746 
747  FUNC_ENTRY;
748  rc = MQTTPacket_send_ack(MQTTVersion, PUBREL, msgid, dup, net);
749  Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
750  FUNC_EXIT_RC(rc);
751  return rc;
752 }
753 
754 
763 int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
764 {
765  int rc = 0;
766 
767  FUNC_ENTRY;
768  rc = MQTTPacket_send_ack(MQTTVersion, PUBCOMP, msgid, 0, net);
769  Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
770  FUNC_EXIT_RC(rc);
771  return rc;
772 }
773 
774 
783 void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
784 {
785  Ack* pack = NULL;
786  char* curdata = data;
787  char* enddata = &data[datalen];
788 
789  FUNC_ENTRY;
790  if ((pack = malloc(sizeof(Ack))) == NULL)
791  goto exit;
792  pack->MQTTVersion = MQTTVersion;
793  pack->header.byte = aHeader;
794  if (pack->header.bits.type != DISCONNECT)
795  pack->msgId = readInt(&curdata);
796  if (MQTTVersion >= MQTTVERSION_5)
797  {
799 
800  pack->rc = MQTTREASONCODE_SUCCESS;
801  pack->properties = props;
802 
803  if (datalen > 2)
804  pack->rc = readChar(&curdata); /* reason code */
805 
806  if (datalen > 3)
807  {
808  if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
809  {
810  if (pack->properties.array)
811  free(pack->properties.array);
812  if (pack)
813  free(pack);
814  pack = NULL; /* signal protocol error */
815  goto exit;
816  }
817  }
818  }
819 exit:
820  FUNC_EXIT;
821  return pack;
822 }
823 
824 
835 int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
836 {
837  Header header;
838  char *topiclen;
839  int rc = SOCKET_ERROR;
840 
841  FUNC_ENTRY;
842  topiclen = malloc(2);
843  if (topiclen == NULL)
844  goto exit;
845 
846  header.bits.type = PUBLISH;
847  header.bits.dup = dup;
848  header.bits.qos = qos;
849  header.bits.retain = retained;
850  if (qos > 0 || pack->MQTTVersion >= 5)
851  {
852  int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
853  char *ptr = NULL;
854  char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
855  size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
856  int frees[4] = {1, 0, 1, 0};
857  PacketBuffers packetbufs = {4, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
858 
859  bufs[2] = ptr = malloc(buflen);
860  if (ptr == NULL)
861  goto exit_free;
862  if (qos > 0)
863  writeInt(&ptr, pack->msgId);
864  if (pack->MQTTVersion >= 5)
865  MQTTProperties_write(&ptr, &pack->properties);
866 
867  ptr = topiclen;
868  writeInt(&ptr, (int)lens[1]);
869  rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
870  if (rc != TCPSOCKET_INTERRUPTED)
871  free(bufs[2]);
872  memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
873  }
874  else
875  {
876  char* ptr = topiclen;
877  char* bufs[3] = {topiclen, pack->topic, pack->payload};
878  size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
879  int frees[3] = {1, 0, 0};
880  PacketBuffers packetbufs = {3, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
881 
882  writeInt(&ptr, (int)lens[1]);
883  rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
884  memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
885  }
886  if (qos == 0)
887  Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc, pack->payloadlen,
888  min(20, pack->payloadlen), pack->payload);
889  else
890  Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, pack->payloadlen,
891  min(20, pack->payloadlen), pack->payload);
892 exit_free:
893  if (rc != TCPSOCKET_INTERRUPTED)
894  free(topiclen);
895 exit:
896  FUNC_EXIT_RC(rc);
897  return rc;
898 }
899 
900 
906 {
907  FUNC_ENTRY;
908  if (pack->header.bits.type == PUBLISH)
910  /*else if (pack->header.type == SUBSCRIBE)
911  MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
912  else if (pack->header.type == UNSUBSCRIBE)
913  MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
914  else
915  free(pack);
916  FUNC_EXIT;
917 }
918 
919 
925 void writeInt4(char** pptr, int anInt)
926 {
927  **pptr = (char)(anInt / 16777216);
928  (*pptr)++;
929  anInt %= 16777216;
930  **pptr = (char)(anInt / 65536);
931  (*pptr)++;
932  anInt %= 65536;
933  **pptr = (char)(anInt / 256);
934  (*pptr)++;
935  **pptr = (char)(anInt % 256);
936  (*pptr)++;
937 }
938 
939 
945 int readInt4(char** pptr)
946 {
947  unsigned char* ptr = (unsigned char*)*pptr;
948  int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
949  *pptr += 4;
950  return value;
951 }
952 
953 
954 void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
955 {
956  writeInt(pptr, lenstring.len);
957  memcpy(*pptr, lenstring.data, lenstring.len);
958  *pptr += lenstring.len;
959 }
960 
961 
962 int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
963 {
964  int len = 0;
965 
966  /* the first two bytes are the length of the string */
967  if (enddata - (*pptr) > 1) /* enough length to read the integer? */
968  {
969  lenstring->len = readInt(pptr); /* increments pptr to point past length */
970  if (&(*pptr)[lenstring->len] <= enddata)
971  {
972  lenstring->data = (char*)*pptr;
973  *pptr += lenstring->len;
974  len = 2 + lenstring->len;
975  }
976  }
977  return len;
978 }
979 
980 /*
981 if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
982  len = 1;
983 else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
984  len = 2;
985 else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
986  len = 3;
987 else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
988  len = 4;
989 */
990 int MQTTPacket_VBIlen(int rem_len)
991 {
992  int rc = 0;
993 
994  if (rem_len < 128)
995  rc = 1;
996  else if (rem_len < 16384)
997  rc = 2;
998  else if (rem_len < 2097152)
999  rc = 3;
1000  else
1001  rc = 4;
1002  return rc;
1003 }
1004 
1005 
1012 int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
1013 {
1014  char c;
1015  int multiplier = 1;
1016  int len = 0;
1017 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
1018 
1019  *value = 0;
1020  do
1021  {
1022  int rc = MQTTPACKET_READ_ERROR;
1023 
1025  {
1026  rc = MQTTPACKET_READ_ERROR; /* bad data */
1027  goto exit;
1028  }
1029  rc = (*getcharfn)(&c, 1);
1030  if (rc != 1)
1031  goto exit;
1032  *value += (c & 127) * multiplier;
1033  multiplier *= 128;
1034  } while ((c & 128) != 0);
1035 exit:
1036  return len;
1037 }
1038 
1039 
1040 static char* bufptr;
1041 
1042 int bufchar(char* c, int count)
1043 {
1044  int i;
1045 
1046  for (i = 0; i < count; ++i)
1047  *c = *bufptr++;
1048  return count;
1049 }
1050 
1051 
1052 int MQTTPacket_decodeBuf(char* buf, unsigned int* value)
1053 {
1054  bufptr = buf;
1055  return MQTTPacket_VBIdecode(bufchar, value);
1056 }
1057 
static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
Definition: MQTTPacket.c:636
void MQTTPacket_freeSuback(Suback *pack)
Definition: MQTTPacket.c:684
MQTTProperties properties
Definition: MQTTPacket.h:176
void WebSocket_framePosSeekTo(size_t pos)
Definition: WebSocket.c:647
#define LOG_PROTOCOL
Definition: Log.h:64
void MQTTPacket_freeAck(Ack *pack)
Definition: MQTTPacket.c:617
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
enum MQTTPropertyCodes value
#define TRACE_MIN
Definition: Log.h:66
int count
Definition: Socket.h:98
char * topic
Definition: MQTTPacket.h:200
void * MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
int MQTTProperties_len(MQTTProperties *props)
int MQTTPacket_VBIlen(int rem_len)
Definition: MQTTPacket.c:990
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
MQTTReasonCodes
#define FUNC_EXIT
Definition: StackTrace.h:59
MQTTProperties props
Definition: paho_c_pub.c:54
void * MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
Definition: MQTTPacket.c:553
#define min(A, B)
Definition: MQTTPacket.c:44
int readInt(char **pptr)
Definition: MQTTPacket.c:359
int msgId
Definition: MQTTPacket.h:217
int MQTTVersion
Definition: Clients.h:146
int msgId
Definition: MQTTPacket.h:202
bool retain
Definition: MQTTPacket.h:77
void MQTTProperties_free(MQTTProperties *props)
char byte
Definition: MQTTPacket.h:65
MQTTClient d
Definition: test10.c:1656
unsigned char readChar(char **pptr)
Definition: MQTTPacket.c:425
void *(* pf)(int, unsigned char, char *, size_t)
Definition: MQTTPacket.h:32
int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:668
#define MAX_NO_OF_REMAINING_LENGTH_BYTES
struct Header::@59 bits
void MQTTPacket_free_packet(MQTTPacket *pack)
Definition: MQTTPacket.c:905
char * payload
Definition: MQTTPacket.h:203
size_t WebSocket_framePos()
Definition: WebSocket.c:634
int MQTTVersion
Definition: MQTTPacket.h:175
int MQTTPacket_send_disconnect(Clients *client, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTPacket.c:508
char * WebSocket_getdata(networkHandles *net, size_t bytes, size_t *actual_len)
receives data from a socket. It should receive all data from the socket that is immediately available...
Definition: WebSocket.c:669
#define malloc(x)
Definition: Heap.h:41
void MQTTPacket_freePublish(Publish *pack)
Definition: MQTTPacket.c:601
char * readUTF(char **pptr, char *enddata)
Definition: MQTTPacket.c:413
void writeInt(char **pptr, int anInt)
Definition: MQTTPacket.c:450
static l_noret error(LoadState *S, const char *why)
Definition: lundump.c:40
int topiclen
Definition: MQTTPacket.h:201
void writeMQTTLenString(char **pptr, MQTTLenString lenstring)
Definition: MQTTPacket.c:954
Header header
Definition: MQTTPacket.h:164
MQTTProperties properties
Definition: MQTTPacket.h:189
char ** buffers
Definition: Socket.h:99
size_t * buflens
Definition: Socket.h:100
void writeChar(char **pptr, char c)
Definition: MQTTPacket.c:438
#define free(x)
Definition: Heap.h:55
unsigned int type
Definition: MQTTPacket.h:80
int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:763
networkHandles net
Definition: Clients.h:129
const char ** MQTTClient_packet_names
Definition: MQTTPacket.c:57
int MQTTPacket_decode(networkHandles *net, size_t *value)
Definition: MQTTPacket.c:325
void * MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
constexpr size_t count()
Definition: core.h:960
int bufchar(char *c, int count)
Definition: MQTTPacket.c:1042
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
int MQTTPacket_decodeBuf(char *buf, unsigned int *value)
Definition: MQTTPacket.c:1052
int MQTTPacket_encode(char *buf, size_t length)
Definition: MQTTPacket.c:297
List * reasonCodes
Definition: MQTTPacket.h:190
int readInt4(char **pptr)
Definition: MQTTPacket.c:945
bool dup
Definition: MQTTPacket.h:79
static char * readUTFlen(char **pptr, char *enddata, int *len)
Definition: MQTTPacket.c:380
pf new_packets[]
Definition: MQTTPacket.c:73
int payloadlen
Definition: MQTTPacket.h:204
START_TIME_TYPE lastReceived
Definition: Clients.h:82
int MQTTLenStringRead(MQTTLenString *lenstring, char **pptr, char *enddata)
Definition: MQTTPacket.c:962
const char * MQTTPacket_name(int ptype)
Definition: MQTTPacket.c:65
void writeData(char **pptr, const void *data, int datalen)
Definition: MQTTPacket.c:479
int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:722
int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:743
void MQTTPacket_freeUnsuback(Unsuback *pack)
Definition: MQTTPacket.c:700
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
void * MQTTPacket_Factory(int MQTTVersion, networkHandles *net, int *error)
Definition: MQTTPacket.c:103
unsigned char rc
Definition: MQTTPacket.h:218
int qos
Definition: test6.c:56
MQTTProperties properties
Definition: MQTTPacket.h:206
int WebSocket_getch(networkHandles *net, char *c)
receives 1 byte from a socket
Definition: WebSocket.c:589
Definition: Log.h:41
void writeUTF(char **pptr, const char *string)
Definition: MQTTPacket.c:464
void * MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
List * qoss
Definition: MQTTPacket.h:177
void writeInt4(char **pptr, int anInt)
Definition: MQTTPacket.c:925
MQTTAsync client
Definition: test6.c:276
MQTTProperty * array
int MQTTVersion
Definition: MQTTPacket.h:219
#define FUNC_ENTRY
Definition: StackTrace.h:55
int MQTTPacket_VBIdecode(int(*getcharfn)(char *, int), unsigned int *value)
Definition: MQTTPacket.c:1012
START_TIME_TYPE lastSent
Definition: Clients.h:81
int WebSocket_putdatas(networkHandles *net, char **buf0, size_t *buf0len, PacketBuffers *bufs)
Definition: WebSocket.c:938
int MQTTVersion
Definition: MQTTPacket.h:188
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
uint8_t mask[4]
Definition: Socket.h:102
int * frees
Definition: Socket.h:101
MQTTClient c
Definition: test10.c:1656
int MQTTPersistence_putPacket(int socket, char *buf0, size_t buf0len, int count, char **buffers, size_t *buflens, int htype, int msgId, int scr, int MQTTVersion)
int retained
Definition: test6.c:57
const void * ptr(const T *p)
Definition: format.h:3610
START_TIME_TYPE MQTTTime_now(void)
Definition: MQTTTime.c:66
void * MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
Definition: MQTTPacket.c:783
Header header
Definition: MQTTPacket.h:216
int MQTTPacket_send(networkHandles *net, Header header, char *buffer, size_t buflen, int freeData, int MQTTVersion)
Definition: MQTTPacket.c:190
Header header
Definition: MQTTPacket.h:199
char * clientID
Definition: Clients.h:119
#define TCPSOCKET_COMPLETE
Definition: Socket.h:73
int MQTTPacket_sends(networkHandles *net, Header header, PacketBuffers *bufs, int MQTTVersion)
Definition: MQTTPacket.c:250
static const char * packet_names[]
Definition: MQTTPacket.c:50
int MQTTVersion
Definition: MQTTPacket.h:205
MQTTProperties properties
Definition: MQTTPacket.h:220
dictionary data
Definition: mqtt_test.py:22
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
int MQTTPacket_send_publish(Publish *pack, int dup, int qos, int retained, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:835
static char * bufptr
Definition: MQTTPacket.c:1040
unsigned int qos
Definition: MQTTPacket.h:78
uint8_t mask[4]
Definition: MQTTPacket.h:207
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
void * MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char *data, size_t datalen)
Definition: MQTTPacket.c:495
int len
Definition: utf-8.c:46
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09