MQTTProtocolClient.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - fix for bug 413429 - connectionLost not called
16  * Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
17  * Rong Xiang, Ian Craggs - C++ compatibility
18  * Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
19  * Ian Craggs - ensure that acks are not sent if write is outstanding on socket
20  * Ian Craggs - MQTT 5.0 support
21  *******************************************************************************/
22 
31 #include <stdlib.h>
32 #include <string.h>
33 #include <stdint.h>
34 
35 #include "MQTTProtocolClient.h"
36 #if !defined(NO_PERSISTENCE)
37 #include "MQTTPersistence.h"
38 #endif
39 #include "SocketBuffer.h"
40 #include "StackTrace.h"
41 #include "Heap.h"
42 
43 #if !defined(min)
44 #define min(A,B) ( (A) < (B) ? (A):(B))
45 #endif
46 
47 extern MQTTProtocol state;
48 extern ClientStates* bstate;
49 
50 static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish);
52  Clients* pubclient,
53  Publish* publish,
54  int qos,
55  int retained);
56 static void MQTTProtocol_retries(START_TIME_TYPE now, Clients* client, int regardless);
57 
58 
65 int messageIDCompare(void* a, void* b)
66 {
67  Messages* msg = (Messages*)a;
68  return msg->msgid == *(int*)b;
69 }
70 
71 
79 {
80  int start_msgid = client->msgID;
81  int msgid = start_msgid;
82 
83  FUNC_ENTRY;
84  msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
85  while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
86  {
87  msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
88  if (msgid == start_msgid)
89  { /* we've tried them all - none free */
90  msgid = 0;
91  break;
92  }
93  }
94  if (msgid != 0)
95  client->msgID = msgid;
96  FUNC_EXIT_RC(msgid);
97  return msgid;
98 }
99 
100 
101 static void MQTTProtocol_storeQoS0(Clients* pubclient, Publish* publish)
102 {
103  int len;
104  pending_write* pw = NULL;
105 
106  FUNC_ENTRY;
107  /* store the publication until the write is finished */
108  if ((pw = malloc(sizeof(pending_write))) == NULL)
109  goto exit;
110  Log(TRACE_MIN, 12, NULL);
111  if ((pw->p = MQTTProtocol_storePublication(publish, &len)) == NULL)
112  {
113  free(pw);
114  goto exit;
115  }
116  pw->socket = pubclient->net.socket;
117  if (!ListAppend(&(state.pending_writes), pw, sizeof(pending_write)+len))
118  {
119  free(pw->p);
120  free(pw);
121  goto exit;
122  }
123  /* we don't copy QoS 0 messages unless we have to, so now we have to tell the socket buffer where
124  the saved copy is */
125  if (SocketBuffer_updateWrite(pw->socket, pw->p->topic, pw->p->payload) == NULL)
126  Log(LOG_SEVERE, 0, "Error updating write");
127 exit:
128  FUNC_EXIT;
129 }
130 
131 
140 static int MQTTProtocol_startPublishCommon(Clients* pubclient, Publish* publish, int qos, int retained)
141 {
142  int rc = TCPSOCKET_COMPLETE;
143 
144  FUNC_ENTRY;
145  rc = MQTTPacket_send_publish(publish, 0, qos, retained, &pubclient->net, pubclient->clientID);
146  if (qos == 0 && rc == TCPSOCKET_INTERRUPTED)
147  MQTTProtocol_storeQoS0(pubclient, publish);
148  FUNC_EXIT_RC(rc);
149  return rc;
150 }
151 
152 
162 int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int retained, Messages** mm)
163 {
164  Publish p = *publish;
165  int rc = 0;
166 
167  FUNC_ENTRY;
168  if (qos > 0)
169  {
170  *mm = MQTTProtocol_createMessage(publish, mm, qos, retained, 0);
171  ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
172  /* we change these pointers to the saved message location just in case the packet could not be written
173  entirely; the socket buffer will use these locations to finish writing the packet */
174  p.payload = (*mm)->publish->payload;
175  p.topic = (*mm)->publish->topic;
176  p.properties = (*mm)->properties;
177  p.MQTTVersion = (*mm)->MQTTVersion;
178  }
179  rc = MQTTProtocol_startPublishCommon(pubclient, &p, qos, retained);
180  if (qos > 0)
181  memcpy((*mm)->publish->mask, p.mask, sizeof((*mm)->publish->mask));
182  FUNC_EXIT_RC(rc);
183  return rc;
184 }
185 
186 
196 Messages* MQTTProtocol_createMessage(Publish* publish, Messages **mm, int qos, int retained, int allocatePayload)
197 {
198  Messages* m = malloc(sizeof(Messages));
199 
200  FUNC_ENTRY;
201  if (!m)
202  goto exit;
203  m->len = sizeof(Messages);
204  if (*mm == NULL || (*mm)->publish == NULL)
205  {
206  int len1;
207  *mm = m;
208  if ((m->publish = MQTTProtocol_storePublication(publish, &len1)) == NULL)
209  {
210  free(m);
211  goto exit;
212  }
213  m->len += len1;
214  if (allocatePayload)
215  {
216  char *temp = m->publish->payload;
217 
218  if ((m->publish->payload = malloc(m->publish->payloadlen)) == NULL)
219  {
220  free(m);
221  goto exit;
222  }
223  memcpy(m->publish->payload, temp, m->publish->payloadlen);
224  }
225  }
226  else /* this is now never used, I think */
227  {
228  ++(((*mm)->publish)->refcount);
229  m->publish = (*mm)->publish;
230  }
231  m->msgid = publish->msgId;
232  m->qos = qos;
233  m->retain = retained;
234  m->MQTTVersion = publish->MQTTVersion;
235  if (m->MQTTVersion >= 5)
236  m->properties = MQTTProperties_copy(&publish->properties);
237  m->lastTouch = MQTTTime_now();
238  if (qos == 2)
239  m->nextMessageType = PUBREC;
240 exit:
241  FUNC_EXIT;
242  return m;
243 }
244 
245 
253 {
254  Publications* p = malloc(sizeof(Publications));
255 
256  FUNC_ENTRY;
257  if (!p)
258  goto exit;
259  p->refcount = 1;
260  *len = (int)strlen(publish->topic)+1;
261  p->topic = publish->topic;
262  publish->topic = NULL;
263  *len += sizeof(Publications);
264  p->topiclen = publish->topiclen;
265  p->payloadlen = publish->payloadlen;
266  p->payload = publish->payload;
267  publish->payload = NULL;
268  *len += publish->payloadlen;
269  memcpy(p->mask, publish->mask, sizeof(p->mask));
270 
271  if ((ListAppend(&(state.publications), p, *len)) == NULL)
272  {
273  free(p);
274  p = NULL;
275  }
276 exit:
277  FUNC_EXIT;
278  return p;
279 }
280 
286 {
287  FUNC_ENTRY;
288  if (p && --(p->refcount) == 0)
289  {
290  free(p->payload);
291  free(p->topic);
292  ListRemove(&(state.publications), p);
293  }
294  FUNC_EXIT;
295 }
296 
305 int MQTTProtocol_handlePublishes(void* pack, int sock)
306 {
307  Publish* publish = (Publish*)pack;
308  Clients* client = NULL;
309  char* clientid = NULL;
310  int rc = TCPSOCKET_COMPLETE;
311 
312  FUNC_ENTRY;
313  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
314  clientid = client->clientID;
315  Log(LOG_PROTOCOL, 11, NULL, sock, clientid, publish->msgId, publish->header.bits.qos,
316  publish->header.bits.retain, publish->payloadlen, min(20, publish->payloadlen), publish->payload);
317 
318  if (publish->header.bits.qos == 0)
319  Protocol_processPublication(publish, client, 1);
320  else if (!Socket_noPendingWrites(sock))
321  rc = SOCKET_ERROR; /* queue acks? */
322  else if (publish->header.bits.qos == 1)
323  {
324  /* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
325  rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
326  /* if we get a socket error from sending the puback, should we ignore the publication? */
327  Protocol_processPublication(publish, client, 1);
328  }
329  else if (publish->header.bits.qos == 2)
330  {
331  /* store publication in inbound list */
332  int len;
333  int already_received = 0;
334  ListElement* listElem = NULL;
335  Messages* m = malloc(sizeof(Messages));
336  Publications* p = NULL;
337  if (!m)
338  {
339  rc = PAHO_MEMORY_ERROR;
340  goto exit;
341  }
342  p = MQTTProtocol_storePublication(publish, &len);
343 
344  m->publish = p;
345  m->msgid = publish->msgId;
346  m->qos = publish->header.bits.qos;
347  m->retain = publish->header.bits.retain;
348  m->MQTTVersion = publish->MQTTVersion;
349  if (m->MQTTVersion >= MQTTVERSION_5)
350  m->properties = MQTTProperties_copy(&publish->properties);
351  m->nextMessageType = PUBREL;
352  if ((listElem = ListFindItem(client->inboundMsgs, &(m->msgid), messageIDCompare)) != NULL)
353  { /* discard queued publication with same msgID that the current incoming message */
354  Messages* msg = (Messages*)(listElem->content);
356  if (msg->MQTTVersion >= MQTTVERSION_5)
358  ListInsert(client->inboundMsgs, m, sizeof(Messages) + len, listElem);
359  ListRemove(client->inboundMsgs, msg);
360  already_received = 1;
361  } else
362  ListAppend(client->inboundMsgs, m, sizeof(Messages) + len);
363  rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
364  if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0)
365  {
366  Publish publish1;
367 
368  publish1.header.bits.qos = m->qos;
369  publish1.header.bits.retain = m->retain;
370  publish1.msgId = m->msgid;
371  publish1.topic = m->publish->topic;
372  publish1.topiclen = m->publish->topiclen;
373  publish1.payload = m->publish->payload;
374  publish1.payloadlen = m->publish->payloadlen;
375  publish1.MQTTVersion = m->MQTTVersion;
376  publish1.properties = m->properties;
377 
378  Protocol_processPublication(&publish1, client, 1);
379  ListRemove(&(state.publications), m->publish);
380  m->publish = NULL;
381  } else
382  { /* allocate and copy payload data as it's needed for pubrel.
383  For other cases, it's done in Protocol_processPublication */
384  char *temp = m->publish->payload;
385 
386  if ((m->publish->payload = malloc(m->publish->payloadlen)) == NULL)
387  {
388  rc = PAHO_MEMORY_ERROR;
389  goto exit;
390  }
391  memcpy(m->publish->payload, temp, m->publish->payloadlen);
392  }
393  publish->topic = NULL;
394  }
395 exit:
396  MQTTPacket_freePublish(publish);
397  FUNC_EXIT_RC(rc);
398  return rc;
399 }
400 
407 int MQTTProtocol_handlePubacks(void* pack, int sock)
408 {
409  Puback* puback = (Puback*)pack;
410  Clients* client = NULL;
411  int rc = TCPSOCKET_COMPLETE;
412 
413  FUNC_ENTRY;
414  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
415  Log(LOG_PROTOCOL, 14, NULL, sock, client->clientID, puback->msgId);
416 
417  /* look for the message by message id in the records of outbound messages for this client */
418  if (ListFindItem(client->outboundMsgs, &(puback->msgId), messageIDCompare) == NULL)
419  Log(TRACE_MIN, 3, NULL, "PUBACK", client->clientID, puback->msgId);
420  else
421  {
422  Messages* m = (Messages*)(client->outboundMsgs->current->content);
423  if (m->qos != 1)
424  Log(TRACE_MIN, 4, NULL, "PUBACK", client->clientID, puback->msgId, m->qos);
425  else
426  {
427  Log(TRACE_MIN, 6, NULL, "PUBACK", client->clientID, puback->msgId);
428  #if !defined(NO_PERSISTENCE)
429  rc = MQTTPersistence_remove(client,
431  m->qos, puback->msgId);
432  #endif
434  if (m->MQTTVersion >= MQTTVERSION_5)
436  ListRemove(client->outboundMsgs, m);
437  }
438  }
439  if (puback->MQTTVersion >= MQTTVERSION_5)
441  free(pack);
442  FUNC_EXIT_RC(rc);
443  return rc;
444 }
445 
446 
453 int MQTTProtocol_handlePubrecs(void* pack, int sock)
454 {
455  Pubrec* pubrec = (Pubrec*)pack;
456  Clients* client = NULL;
457  int rc = TCPSOCKET_COMPLETE;
458 
459  FUNC_ENTRY;
460  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
461  Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);
462 
463  /* look for the message by message id in the records of outbound messages for this client */
464  client->outboundMsgs->current = NULL;
465  if (ListFindItem(client->outboundMsgs, &(pubrec->msgId), messageIDCompare) == NULL)
466  {
467  if (pubrec->header.bits.dup == 0)
468  Log(TRACE_MIN, 3, NULL, "PUBREC", client->clientID, pubrec->msgId);
469  }
470  else
471  {
472  Messages* m = (Messages*)(client->outboundMsgs->current->content);
473  if (m->qos != 2)
474  {
475  if (pubrec->header.bits.dup == 0)
476  Log(TRACE_MIN, 4, NULL, "PUBREC", client->clientID, pubrec->msgId, m->qos);
477  }
478  else if (m->nextMessageType != PUBREC)
479  {
480  if (pubrec->header.bits.dup == 0)
481  Log(TRACE_MIN, 5, NULL, "PUBREC", client->clientID, pubrec->msgId);
482  }
483  else
484  {
485  if (pubrec->MQTTVersion >= MQTTVERSION_5 && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
486  {
487  Log(TRACE_MIN, -1, "Pubrec error %d received for client %s msgid %d, not sending PUBREL",
488  pubrec->rc, client->clientID, pubrec->msgId);
489  #if !defined(NO_PERSISTENCE)
490  rc = MQTTPersistence_remove(client,
492  m->qos, pubrec->msgId);
493  #endif
495  if (m->MQTTVersion >= MQTTVERSION_5)
497  ListRemove(client->outboundMsgs, m);
498  (++state.msgs_sent);
499  }
500  else
501  {
502  rc = MQTTPacket_send_pubrel(pubrec->MQTTVersion, pubrec->msgId, 0, &client->net, client->clientID);
504  m->lastTouch = MQTTTime_now();
505  }
506  }
507  }
508  if (pubrec->MQTTVersion >= MQTTVERSION_5)
510  free(pack);
511  FUNC_EXIT_RC(rc);
512  return rc;
513 }
514 
515 
522 int MQTTProtocol_handlePubrels(void* pack, int sock)
523 {
524  Pubrel* pubrel = (Pubrel*)pack;
525  Clients* client = NULL;
526  int rc = TCPSOCKET_COMPLETE;
527 
528  FUNC_ENTRY;
529  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
530  Log(LOG_PROTOCOL, 17, NULL, sock, client->clientID, pubrel->msgId);
531 
532  /* look for the message by message id in the records of inbound messages for this client */
533  if (ListFindItem(client->inboundMsgs, &(pubrel->msgId), messageIDCompare) == NULL)
534  {
535  if (pubrel->header.bits.dup == 0)
536  Log(TRACE_MIN, 3, NULL, "PUBREL", client->clientID, pubrel->msgId);
537  else if (!Socket_noPendingWrites(sock))
538  rc = SOCKET_ERROR; /* queue acks? */
539  else
540  /* Apparently this is "normal" behaviour, so we don't need to issue a warning */
541  rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);
542  }
543  else
544  {
545  Messages* m = (Messages*)(client->inboundMsgs->current->content);
546  if (m->qos != 2)
547  Log(TRACE_MIN, 4, NULL, "PUBREL", client->clientID, pubrel->msgId, m->qos);
548  else if (m->nextMessageType != PUBREL)
549  Log(TRACE_MIN, 5, NULL, "PUBREL", client->clientID, pubrel->msgId);
550  else if (!Socket_noPendingWrites(sock))
551  rc = SOCKET_ERROR; /* queue acks? */
552  else
553  {
554  Publish publish;
555 
556  memset(&publish, '\0', sizeof(publish));
557  /* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */
558  rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);
559  publish.header.bits.qos = m->qos;
560  publish.header.bits.retain = m->retain;
561  publish.msgId = m->msgid;
562  if (m->publish)
563  {
564  publish.topic = m->publish->topic;
565  publish.topiclen = m->publish->topiclen;
566  publish.payload = m->publish->payload;
567  publish.payloadlen = m->publish->payloadlen;
568  }
569  publish.MQTTVersion = m->MQTTVersion;
570  if (publish.MQTTVersion >= MQTTVERSION_5)
571  publish.properties = m->properties;
572  else
573  Protocol_processPublication(&publish, client, 0); /* only for 3.1.1 and lower */
574  #if !defined(NO_PERSISTENCE)
575  rc += MQTTPersistence_remove(client,
577  m->qos, pubrel->msgId);
578  #endif
579  if (m->MQTTVersion >= MQTTVERSION_5)
581  if (m->publish)
582  ListRemove(&(state.publications), m->publish);
583  ListRemove(client->inboundMsgs, m);
584  ++(state.msgs_received);
585  }
586  }
587  if (pubrel->MQTTVersion >= MQTTVERSION_5)
589  free(pack);
590  FUNC_EXIT_RC(rc);
591  return rc;
592 }
593 
594 
601 int MQTTProtocol_handlePubcomps(void* pack, int sock)
602 {
603  Pubcomp* pubcomp = (Pubcomp*)pack;
604  Clients* client = NULL;
605  int rc = TCPSOCKET_COMPLETE;
606 
607  FUNC_ENTRY;
608  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
609  Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId);
610 
611  /* look for the message by message id in the records of outbound messages for this client */
612  if (ListFindItem(client->outboundMsgs, &(pubcomp->msgId), messageIDCompare) == NULL)
613  {
614  if (pubcomp->header.bits.dup == 0)
615  Log(TRACE_MIN, 3, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
616  }
617  else
618  {
619  Messages* m = (Messages*)(client->outboundMsgs->current->content);
620  if (m->qos != 2)
621  Log(TRACE_MIN, 4, NULL, "PUBCOMP", client->clientID, pubcomp->msgId, m->qos);
622  else
623  {
624  if (m->nextMessageType != PUBCOMP)
625  Log(TRACE_MIN, 5, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
626  else
627  {
628  Log(TRACE_MIN, 6, NULL, "PUBCOMP", client->clientID, pubcomp->msgId);
629  #if !defined(NO_PERSISTENCE)
630  rc = MQTTPersistence_remove(client,
632  m->qos, pubcomp->msgId);
633  if (rc != 0)
634  Log(LOG_ERROR, -1, "Error removing PUBCOMP for client id %s msgid %d from persistence", client->clientID, pubcomp->msgId);
635  #endif
637  if (m->MQTTVersion >= MQTTVERSION_5)
639  ListRemove(client->outboundMsgs, m);
640  (++state.msgs_sent);
641  }
642  }
643  }
644  if (pubcomp->MQTTVersion >= MQTTVERSION_5)
645  MQTTProperties_free(&pubcomp->properties);
646  free(pack);
647  FUNC_EXIT_RC(rc);
648  return rc;
649 }
650 
651 
657 {
658  ListElement* current = NULL;
659 
660  FUNC_ENTRY;
661  ListNextElement(bstate->clients, &current);
662  while (current)
663  {
664  Clients* client = (Clients*)(current->content);
665  ListNextElement(bstate->clients, &current);
666 
667  if (client->connected == 0 || client->keepAliveInterval == 0)
668  continue;
669 
670  if (client->ping_outstanding == 1)
671  {
672  if (MQTTTime_difftime(now, client->net.lastPing) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
673  {
674  Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
675  MQTTProtocol_closeSession(client, 1);
676  }
677  }
678  else if (MQTTTime_difftime(now, client->net.lastSent) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000) ||
679  MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
680  {
681  if (Socket_noPendingWrites(client->net.socket))
682  {
683  if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
684  {
685  Log(TRACE_PROTOCOL, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
686  MQTTProtocol_closeSession(client, 1);
687  }
688  else
689  {
690  client->net.lastPing = now;
691  client->ping_outstanding = 1;
692  }
693  }
694  }
695  }
696  FUNC_EXIT;
697 }
698 
699 
706 static void MQTTProtocol_retries(START_TIME_TYPE now, Clients* client, int regardless)
707 {
708  ListElement* outcurrent = NULL;
709 
710  FUNC_ENTRY;
711 
712  if (!regardless && client->retryInterval <= 0) /* 0 or -ive retryInterval turns off retry except on reconnect */
713  goto exit;
714 
715  while (client && ListNextElement(client->outboundMsgs, &outcurrent) &&
716  client->connected && client->good && /* client is connected and has no errors */
717  Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */
718  {
719  Messages* m = (Messages*)(outcurrent->content);
720  if (regardless || MQTTTime_difftime(now, m->lastTouch) > (DIFF_TIME_TYPE)(max(client->retryInterval, 10) * 1000))
721  {
722  if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC))
723  {
724  Publish publish;
725  int rc;
726 
727  Log(TRACE_MIN, 7, NULL, "PUBLISH", client->clientID, client->net.socket, m->msgid);
728  publish.msgId = m->msgid;
729  publish.topic = m->publish->topic;
730  publish.payload = m->publish->payload;
731  publish.payloadlen = m->publish->payloadlen;
732  publish.properties = m->properties;
733  publish.MQTTVersion = m->MQTTVersion;
734  memcpy(publish.mask, m->publish->mask, sizeof(publish.mask));
735  rc = MQTTPacket_send_publish(&publish, 1, m->qos, m->retain, &client->net, client->clientID);
736  memcpy(m->publish->mask, publish.mask, sizeof(m->publish->mask)); /* store websocket mask used in send */
737  if (rc == SOCKET_ERROR)
738  {
739  client->good = 0;
740  Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
741  Socket_getpeer(client->net.socket));
742  MQTTProtocol_closeSession(client, 1);
743  client = NULL;
744  }
745  else
746  {
747  if (m->qos == 0 && rc == TCPSOCKET_INTERRUPTED)
748  MQTTProtocol_storeQoS0(client, &publish);
749  m->lastTouch = MQTTTime_now();
750  }
751  }
752  else if (m->qos && m->nextMessageType == PUBCOMP)
753  {
754  Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);
755  if (MQTTPacket_send_pubrel(m->MQTTVersion, m->msgid, 0, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
756  {
757  client->good = 0;
758  Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
759  Socket_getpeer(client->net.socket));
760  MQTTProtocol_closeSession(client, 1);
761  client = NULL;
762  }
763  else
764  m->lastTouch = MQTTTime_now();
765  }
766  /* break; why not do all retries at once? */
767  }
768  }
769 exit:
770  FUNC_EXIT;
771 }
772 
773 
780 void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
781 {
782  ListElement* current = NULL;
783 
784  FUNC_ENTRY;
785  ListNextElement(bstate->clients, &current);
786  /* look through the outbound message list of each client, checking to see if a retry is necessary */
787  while (current)
788  {
789  Clients* client = (Clients*)(current->content);
790  ListNextElement(bstate->clients, &current);
791  if (client->connected == 0)
792  continue;
793  if (client->good == 0)
794  {
795  MQTTProtocol_closeSession(client, 1);
796  continue;
797  }
798  if (Socket_noPendingWrites(client->net.socket) == 0)
799  continue;
800  if (doRetry)
801  MQTTProtocol_retries(now, client, regardless);
802  }
803  FUNC_EXIT;
804 }
805 
806 
812 {
813  FUNC_ENTRY;
814  /* free up pending message lists here, and any other allocated data */
817  ListFree(client->messageQueue);
818  free(client->clientID);
819  client->clientID = NULL;
820  if (client->will)
821  {
822  free(client->will->payload);
823  free(client->will->topic);
824  free(client->will);
825  client->will = NULL;
826  }
827  if (client->username)
828  free((void*)client->username);
829  if (client->password)
830  free((void*)client->password);
831 #if defined(OPENSSL)
832  if (client->sslopts)
833  {
834  if (client->sslopts->trustStore)
835  free((void*)client->sslopts->trustStore);
836  if (client->sslopts->keyStore)
837  free((void*)client->sslopts->keyStore);
838  if (client->sslopts->privateKey)
839  free((void*)client->sslopts->privateKey);
840  if (client->sslopts->privateKeyPassword)
841  free((void*)client->sslopts->privateKeyPassword);
842  if (client->sslopts->enabledCipherSuites)
843  free((void*)client->sslopts->enabledCipherSuites);
844  if (client->sslopts->struct_version >= 2)
845  {
846  if (client->sslopts->CApath)
847  free((void*)client->sslopts->CApath);
848  }
849  free(client->sslopts);
850  client->sslopts = NULL;
851  }
852 #endif
853  /* don't free the client structure itself... this is done elsewhere */
854  FUNC_EXIT;
855 }
856 
857 
863 {
864  ListElement* current = NULL;
865 
866  FUNC_ENTRY;
867  while (ListNextElement(msgList, &current))
868  {
869  Messages* m = (Messages*)(current->content);
871  if (m->MQTTVersion >= MQTTVERSION_5)
873  }
874  ListEmpty(msgList);
875  FUNC_EXIT;
876 }
877 
878 
884 {
885  FUNC_ENTRY;
887  ListFree(msgList);
888  FUNC_EXIT;
889 }
890 
891 
900 char* MQTTStrncpy(char *dest, const char *src, size_t dest_size)
901 {
902  size_t count = dest_size;
903  char *temp = dest;
904 
905  FUNC_ENTRY;
906  if (dest_size < strlen(src))
907  Log(TRACE_MIN, -1, "the src string is truncated");
908 
909  /* We must copy only the first (dest_size - 1) bytes */
910  while (count > 1 && (*temp++ = *src++))
911  count--;
912 
913  *temp = '\0';
914 
915  FUNC_EXIT;
916  return dest;
917 }
918 
919 
925 char* MQTTStrdup(const char* src)
926 {
927  size_t mlen = strlen(src) + 1;
928  char* temp = malloc(mlen);
929  if (temp)
930  MQTTStrncpy(temp, src, mlen);
931  return temp;
932 }
#define PERSISTENCE_PUBLISH_SENT
void MQTTProtocol_removePublication(Publications *p)
List * messageQueue
Definition: Clients.h:137
int MQTTProtocol_handlePubacks(void *pack, int sock)
Messages * MQTTProtocol_createMessage(Publish *publish, Messages **mm, int qos, int retained, int allocatePayload)
#define LOG_PROTOCOL
Definition: Log.h:64
#define TRACE_MIN
Definition: Log.h:66
#define DIFF_TIME_TYPE
Definition: MQTTTime.h:41
char * topic
Definition: MQTTPacket.h:200
int qos
Definition: Clients.h:55
START_TIME_TYPE lastPing
Definition: Clients.h:83
void MQTTProtocol_freeClient(Clients *client)
#define FUNC_EXIT
Definition: StackTrace.h:59
void MQTTProtocol_freeMessageList(List *msgList)
int MQTTProtocol_handlePubcomps(void *pack, int sock)
int msgId
Definition: MQTTPacket.h:217
int msgId
Definition: MQTTPacket.h:202
unsigned int msgs_received
Definition: MQTTProtocol.h:38
DIFF_TIME_TYPE MQTTTime_difftime(START_TIME_TYPE new, START_TIME_TYPE old)
Definition: MQTTTime.c:99
ListElement * current
Definition: LinkedList.h:69
bool retain
Definition: MQTTPacket.h:77
void MQTTProperties_free(MQTTProperties *props)
pending_writes * SocketBuffer_updateWrite(int socket, char *topic, char *payload)
Definition: SocketBuffer.c:424
int Socket_noPendingWrites(int socket)
Definition: Socket.c:417
static int MQTTProtocol_startPublishCommon(Clients *pubclient, Publish *publish, int qos, int retained)
int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:668
int msgID
Definition: Clients.h:130
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
struct Header::@59 bits
int MQTTProtocol_handlePublishes(void *pack, int sock)
char * payload
Definition: MQTTPacket.h:203
MQTTProperties properties
Definition: Clients.h:59
List pending_writes
Definition: MQTTProtocol.h:40
List * outboundMsgs
Definition: Clients.h:136
#define PERSISTENCE_V5_PUBLISH_SENT
#define PERSISTENCE_V5_PUBLISH_RECEIVED
uint8_t mask[4]
Definition: Clients.h:47
#define malloc(x)
Definition: Heap.h:41
void MQTTPacket_freePublish(Publish *pack)
Definition: MQTTPacket.c:601
int topiclen
Definition: MQTTPacket.h:201
int len
Definition: Clients.h:63
int ListRemove(List *aList, void *content)
Definition: LinkedList.c:257
START_TIME_TYPE lastTouch
Definition: Clients.h:61
void Protocol_processPublication(Publish *publish, Clients *client, int allocatePayload)
Definition: MQTTAsync.c:3396
Publications * p
Definition: MQTTProtocol.h:31
#define free(x)
Definition: Heap.h:55
int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:763
networkHandles net
Definition: Clients.h:129
unsigned int connected
Definition: Clients.h:125
int clientSocketCompare(void *a, void *b)
Definition: Clients.c:50
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
Definition: LinkedList.c:107
int MQTTPersistence_remove(Clients *c, char *type, int qos, int msgId)
unsigned int good
Definition: Clients.h:126
void ListEmpty(List *aList)
Definition: LinkedList.c:359
MQTTProperties MQTTProperties_copy(const MQTTProperties *props)
char * MQTTStrncpy(char *dest, const char *src, size_t dest_size)
#define min(A, B)
constexpr size_t count()
Definition: core.h:960
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
List * clients
Definition: Clients.h:163
static void MQTTProtocol_retries(START_TIME_TYPE now, Clients *client, int regardless)
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
int keepAliveInterval
Definition: Clients.h:131
bool dup
Definition: MQTTPacket.h:79
void MQTTProtocol_keepalive(START_TIME_TYPE now)
Definition: Log.h:42
#define max(A, B)
Definition: Socket.h:88
int payloadlen
Definition: MQTTPacket.h:204
START_TIME_TYPE lastReceived
Definition: Clients.h:82
static void MQTTProtocol_storeQoS0(Clients *pubclient, Publish *publish)
int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:722
int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:743
#define MAX_MSG_ID
Definition: MQTTProtocol.h:25
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
unsigned char rc
Definition: MQTTPacket.h:218
MQTTProtocol state
Definition: sol.hpp:23399
void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
int msgid
Definition: Clients.h:57
int qos
Definition: test6.c:56
MQTTProperties properties
Definition: MQTTPacket.h:206
int payloadlen
Definition: Clients.h:45
void * payload
Definition: Clients.h:73
#define START_TIME_TYPE
Definition: MQTTTime.h:36
int refcount
Definition: Clients.h:46
Definition: Log.h:41
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
List publications
Definition: MQTTProtocol.h:37
char nextMessageType
Definition: Clients.h:62
void MQTTProtocol_closeSession(Clients *c, int sendwill)
Definition: MQTTAsync.c:3917
MQTTAsync client
Definition: test6.c:276
int MQTTVersion
Definition: MQTTPacket.h:219
#define FUNC_ENTRY
Definition: StackTrace.h:55
START_TIME_TYPE lastSent
Definition: Clients.h:81
unsigned int ping_outstanding
Definition: Clients.h:127
char * payload
Definition: Clients.h:44
char * MQTTStrdup(const char *src)
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
int topiclen
Definition: Clients.h:43
char * Socket_getpeer(int sock)
Definition: Socket.c:1058
int messageIDCompare(void *a, void *b)
int retained
Definition: test6.c:57
void MQTTProtocol_emptyMessageList(List *msgList)
char * clientid
Definition: test6.c:54
const void * password
Definition: Clients.h:122
char * topic
Definition: Clients.h:42
char * topic
Definition: Clients.h:71
int retain
Definition: Clients.h:56
START_TIME_TYPE MQTTTime_now(void)
Definition: MQTTTime.c:66
int MQTTProtocol_handlePubrels(void *pack, int sock)
int MQTTVersion
Definition: Clients.h:58
Header header
Definition: MQTTPacket.h:216
int MQTTPacket_send_pingreq(networkHandles *net, const char *clientID)
Header header
Definition: MQTTPacket.h:199
char * clientID
Definition: Clients.h:119
int MQTTProtocol_handlePubrecs(void *pack, int sock)
#define TCPSOCKET_COMPLETE
Definition: Socket.h:73
int MQTTProtocol_startPublish(Clients *pubclient, Publish *publish, int qos, int retained, Messages **mm)
int MQTTVersion
Definition: MQTTPacket.h:205
MQTTProperties properties
Definition: MQTTPacket.h:220
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
List * inboundMsgs
Definition: Clients.h:135
ClientStates * bstate
Definition: MQTTAsync.c:117
int retryInterval
Definition: Clients.h:132
unsigned int msgs_sent
Definition: MQTTProtocol.h:39
Publications * publish
Definition: Clients.h:60
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
#define PERSISTENCE_PUBLISH_RECEIVED
int MQTTPacket_send_publish(Publish *pack, int dup, int qos, int retained, networkHandles *net, const char *clientID)
Definition: MQTTPacket.c:835
Publications * MQTTProtocol_storePublication(Publish *publish, int *len)
const char * username
Definition: Clients.h:120
unsigned int qos
Definition: MQTTPacket.h:78
uint8_t mask[4]
Definition: MQTTPacket.h:207
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
int len
Definition: utf-8.c:46
int MQTTProtocol_assignMsgId(Clients *client)
willMessages * will
Definition: Clients.h:134


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09