MQTTPersistence.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - async client updates
16  * Ian Craggs - fix for bug 432903 - queue persistence
17  * Ian Craggs - MQTT V5 updates
18  *******************************************************************************/
19 
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include "MQTTPersistence.h"
30 #include "MQTTPersistenceDefault.h"
31 #include "MQTTProtocolClient.h"
32 #include "Heap.h"
33 
34 
35 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
36 static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
37 
45 #include "StackTrace.h"
46 
48 {
49  int rc = 0;
50  MQTTClient_persistence* per = NULL;
51 
52  FUNC_ENTRY;
53 #if !defined(NO_PERSISTENCE)
54  switch (type)
55  {
57  per = NULL;
58  break;
60  per = malloc(sizeof(MQTTClient_persistence));
61  if ( per != NULL )
62  {
63  if ( pcontext == NULL )
64  pcontext = "."; /* working directory */
65  if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
66  {
67  free(per);
68  rc = PAHO_MEMORY_ERROR;
69  goto exit;
70  }
71  strcpy(per->context, pcontext);
72  /* file system functions */
73  per->popen = pstopen;
74  per->pclose = pstclose;
75  per->pput = pstput;
76  per->pget = pstget;
77  per->premove = pstremove;
78  per->pkeys = pstkeys;
79  per->pclear = pstclear;
81  }
82  else
83  rc = PAHO_MEMORY_ERROR;
84  break;
86  per = (MQTTClient_persistence *)pcontext;
87  if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
88  per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
89  per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
91  break;
92  default:
94  break;
95  }
96 #endif
97 
98  *persistence = per;
99 exit:
100  FUNC_EXIT_RC(rc);
101  return rc;
102 }
103 
104 
111 int MQTTPersistence_initialize(Clients *c, const char *serverURI)
112 {
113  int rc = 0;
114 
115  FUNC_ENTRY;
116  if ( c->persistence != NULL )
117  {
118  rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
119  if ( rc == 0 )
121  }
122 
123  FUNC_EXIT_RC(rc);
124  return rc;
125 }
126 
127 
134 {
135  int rc = 0;
136 
137  FUNC_ENTRY;
138 #if !defined(NO_PERSISTENCE)
139  if (c->persistence != NULL)
140  {
141  rc = c->persistence->pclose(c->phandle);
142 
143  if (c->persistence->context)
144  free(c->persistence->context);
145  if (c->persistence->popen == pstopen)
146  free(c->persistence);
147 
148  c->phandle = NULL;
149  c->persistence = NULL;
150  }
151 #endif
152  FUNC_EXIT_RC(rc);
153  return rc;
154 }
155 
162 {
163  int rc = 0;
164 
165  FUNC_ENTRY;
166  if (c->persistence != NULL)
167  rc = c->persistence->pclear(c->phandle);
168 
169  FUNC_EXIT_RC(rc);
170  return rc;
171 }
172 
173 
181 {
182  int rc = 0;
183  char **msgkeys = NULL,
184  *buffer = NULL;
185  int nkeys, buflen;
186  int i = 0;
187  int msgs_sent = 0;
188  int msgs_rcvd = 0;
189 
190  FUNC_ENTRY;
191  if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
192  {
193  while (rc == 0 && i < nkeys)
194  {
195  if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
196  strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
197  {
198  ;
199  }
200  else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
201  strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
202  {
203  ;
204  }
205  else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
206  (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
207  {
208  int data_MQTTVersion = MQTTVERSION_3_1_1;
209  char* cur_key = msgkeys[i];
210  MQTTPacket* pack = NULL;
211 
212  if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
213  strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
214  {
215  data_MQTTVersion = MQTTVERSION_5;
217  }
218  else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
219  strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
220  {
221  data_MQTTVersion = MQTTVERSION_5;
222  cur_key = PERSISTENCE_PUBLISH_SENT;
223  }
224  else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
225  strlen(PERSISTENCE_V5_PUBREL)) == 0)
226  {
227  data_MQTTVersion = MQTTVERSION_5;
228  cur_key = PERSISTENCE_PUBREL;
229  }
230 
231  if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
232  {
233  rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
234  goto exit;
235  }
236 
237  pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
238  if ( pack != NULL )
239  {
240  if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
241  strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
242  {
243  Publish* publish = (Publish*)pack;
244  Messages* msg = NULL;
245  publish->MQTTVersion = c->MQTTVersion;
246  msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
247  msg->nextMessageType = PUBREL;
248  /* order does not matter for persisted received messages */
249  ListAppend(c->inboundMsgs, msg, msg->len);
250  if (c->MQTTVersion >= MQTTVERSION_5)
251  {
252  free(msg->publish->payload);
253  free(msg->publish->topic);
254  msg->publish->payload = msg->publish->topic = NULL;
255  }
256  publish->topic = NULL;
257  MQTTPacket_freePublish(publish);
258  msgs_rcvd++;
259  }
260  else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
261  strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
262  {
263  Publish* publish = (Publish*)pack;
264  Messages* msg = NULL;
265  char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
266 
267  if (!key)
268  {
269  rc = PAHO_MEMORY_ERROR;
270  goto exit;
271  }
272  publish->MQTTVersion = c->MQTTVersion;
273  if (publish->MQTTVersion >= MQTTVERSION_5)
274  sprintf(key, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
275  else
276  sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
277  msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
278  if (c->persistence->pcontainskey(c->phandle, key) == 0)
279  /* PUBLISH Qo2 and PUBREL sent */
280  msg->nextMessageType = PUBCOMP;
281  /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
282  /* retry at the first opportunity */
283  memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
285  publish->topic = NULL;
286  MQTTPacket_freePublish(publish);
287  free(key);
288  msgs_sent++;
289  }
290  else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
291  {
292  /* orphaned PUBRELs ? */
293  Pubrel* pubrel = (Pubrel*)pack;
294  char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
295 
296  if (!key)
297  {
298  rc = PAHO_MEMORY_ERROR;
299  goto exit;
300  }
301  pubrel->MQTTVersion = c->MQTTVersion;
302  if (pubrel->MQTTVersion >= MQTTVERSION_5)
303  sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
304  else
305  sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
306  if (c->persistence->pcontainskey(c->phandle, key) != 0)
307  rc = c->persistence->premove(c->phandle, msgkeys[i]);
308  free(pubrel);
309  free(key);
310  }
311  }
312  else /* pack == NULL -> bad persisted record */
313  rc = c->persistence->premove(c->phandle, msgkeys[i]);
314  }
315  if (buffer)
316  {
317  free(buffer);
318  buffer = NULL;
319  }
320  if (msgkeys[i])
321  free(msgkeys[i]);
322  i++;
323  }
324  if (msgkeys)
325  free(msgkeys);
326  }
327  Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
328  msgs_sent, msgs_rcvd, c->clientID);
330 exit:
331  FUNC_EXIT_RC(rc);
332  return rc;
333 }
334 
335 
341 void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
342 {
343  void* pack = NULL;
344  Header header;
345  int fixed_header_length = 1, ptype, remaining_length = 0;
346  char c;
347  int multiplier = 1;
348  extern pf new_packets[];
349 
350  FUNC_ENTRY;
351  header.byte = buffer[0];
352  /* decode the message length according to the MQTT algorithm */
353  do
354  {
355  c = *(++buffer);
356  remaining_length += (c & 127) * multiplier;
357  multiplier *= 128;
358  fixed_header_length++;
359  } while ((c & 128) != 0);
360 
361  if ( (fixed_header_length + remaining_length) == buflen )
362  {
363  ptype = header.bits.type;
364  if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
365  pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
366  }
367 
368  FUNC_EXIT;
369  return pack;
370 }
371 
372 
379 void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
380 {
381  ListElement* index = NULL;
382  ListElement* current = NULL;
383 
384  FUNC_ENTRY;
385  while(ListNextElement(list, &current) != NULL && index == NULL)
386  {
387  if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
388  index = current;
389  }
390 
391  ListInsert(list, content, size, index);
392  FUNC_EXIT;
393 }
394 
395 
412 int MQTTPersistence_putPacket(int socket, char* buf0, size_t buf0len, int count,
413  char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
414 {
415  int rc = 0;
416  extern ClientStates* bstate;
417  int nbufs, i;
418  int* lens = NULL;
419  char** bufs = NULL;
420  char *key;
421  Clients* client = NULL;
422 
423  FUNC_ENTRY;
424  client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
425  if (client->persistence != NULL)
426  {
427  if ((key = malloc(MESSAGE_FILENAME_LENGTH + 1)) == NULL)
428  {
429  rc = PAHO_MEMORY_ERROR;
430  goto exit;
431  }
432  nbufs = 1 + count;
433  if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
434  {
435  free(key);
436  rc = PAHO_MEMORY_ERROR;
437  goto exit;
438  }
439  if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
440  {
441  free(key);
442  free(lens);
443  rc = PAHO_MEMORY_ERROR;
444  goto exit;
445  }
446  lens[0] = (int)buf0len;
447  bufs[0] = buf0;
448  for (i = 0; i < count; i++)
449  {
450  lens[i+1] = (int)buflens[i];
451  bufs[i+1] = buffers[i];
452  }
453 
454  /* key */
455  if (scr == 0)
456  { /* sending */
457  char* key_id = PERSISTENCE_PUBLISH_SENT;
458 
459  if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
460  {
461  if (MQTTVersion >= MQTTVERSION_5)
463  }
464  else if (htype == PUBREL) /* PUBREL */
465  {
466  if (MQTTVersion >= MQTTVERSION_5)
467  key_id = PERSISTENCE_V5_PUBREL;
468  else
469  key_id = PERSISTENCE_PUBREL;
470  }
471  sprintf(key, "%s%d", key_id, msgId);
472  }
473  else if (scr == 1) /* receiving PUBLISH QoS2 */
474  {
475  char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
476 
477  if (MQTTVersion >= MQTTVERSION_5)
479  sprintf(key, "%s%d", key_id, msgId);
480  }
481 
482  if (client->beforeWrite)
483  rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
484 
485  if (rc == 0)
486  rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
487 
488  free(key);
489  free(lens);
490  free(bufs);
491  }
492 
493 exit:
494  FUNC_EXIT_RC(rc);
495  return rc;
496 }
497 
498 
508 int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
509 {
510  int rc = 0;
511 
512  FUNC_ENTRY;
513  if (c->persistence != NULL)
514  {
515  char *key = malloc(MESSAGE_FILENAME_LENGTH + 1);
516 
517  if (!key)
518  {
519  rc = PAHO_MEMORY_ERROR;
520  goto exit;
521  }
522  if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
523  strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
524  {
525  sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId) ;
526  rc = c->persistence->premove(c->phandle, key);
527  sprintf(key, "%s%d", PERSISTENCE_V5_PUBREL, msgId) ;
528  rc += c->persistence->premove(c->phandle, key);
529  sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ;
530  rc += c->persistence->premove(c->phandle, key);
531  sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ;
532  rc += c->persistence->premove(c->phandle, key);
533  }
534  else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
535  { /* or PERSISTENCE_PUBLISH_RECEIVED */
536 
537  sprintf(key, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId);
538  rc = c->persistence->premove(c->phandle, key);
539  sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId);
540  rc += c->persistence->premove(c->phandle, key);
541  }
542  free(key);
543  }
544 
545 exit:
546  FUNC_EXIT_RC(rc);
547  return rc;
548 }
549 
550 
557 {
558  ListElement* wrapel = NULL;
559  ListElement* current = NULL;
560 
561  FUNC_ENTRY;
562  if ( client->outboundMsgs->count > 0 )
563  {
564  int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
565  int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
566  int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
567  current = ListNextElement(client->outboundMsgs, &current);
568 
569  while(ListNextElement(client->outboundMsgs, &current) != NULL)
570  {
571  int curMsgID = ((Messages*)current->content)->msgid;
572  int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
573  int curgap = curMsgID - curPrevMsgID;
574  if ( curgap > gap )
575  {
576  gap = curgap;
577  wrapel = current;
578  }
579  }
580  }
581 
582  if ( wrapel != NULL )
583  {
584  /* put wrapel at the beginning of the queue */
585  client->outboundMsgs->first->prev = client->outboundMsgs->last;
586  client->outboundMsgs->last->next = client->outboundMsgs->first;
587  client->outboundMsgs->first = wrapel;
588  client->outboundMsgs->last = wrapel->prev;
589  client->outboundMsgs->first->prev = NULL;
590  client->outboundMsgs->last->next = NULL;
591  }
592  FUNC_EXIT;
593 }
594 
595 
596 #if !defined(NO_PERSISTENCE)
598 {
599  int rc = 0;
600  char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
601 
602  FUNC_ENTRY;
603  if (client->MQTTVersion >= MQTTVERSION_5)
604  sprintf(key, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
605  else
606  sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
607  if ((rc = client->persistence->premove(client->phandle, key)) != 0)
608  Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
609  FUNC_EXIT_RC(rc);
610  return rc;
611 }
612 
613 
614 #define MAX_NO_OF_BUFFERS 9
616 {
617  int rc = 0;
618  int bufindex = 0;
619  char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
620  int lens[MAX_NO_OF_BUFFERS];
621  void* bufs[MAX_NO_OF_BUFFERS];
622  int props_allocated = 0;
623 
624  FUNC_ENTRY;
625  bufs[bufindex] = &qe->msg->payloadlen;
626  lens[bufindex++] = sizeof(qe->msg->payloadlen);
627 
628  bufs[bufindex] = qe->msg->payload;
629  lens[bufindex++] = qe->msg->payloadlen;
630 
631  bufs[bufindex] = &qe->msg->qos;
632  lens[bufindex++] = sizeof(qe->msg->qos);
633 
634  bufs[bufindex] = &qe->msg->retained;
635  lens[bufindex++] = sizeof(qe->msg->retained);
636 
637  bufs[bufindex] = &qe->msg->dup;
638  lens[bufindex++] = sizeof(qe->msg->dup);
639 
640  bufs[bufindex] = &qe->msg->msgid;
641  lens[bufindex++] = sizeof(qe->msg->msgid);
642 
643  bufs[bufindex] = qe->topicName;
644  lens[bufindex++] = (int)strlen(qe->topicName) + 1;
645 
646  bufs[bufindex] = &qe->topicLen;
647  lens[bufindex++] = sizeof(qe->topicLen);
648 
649  if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
650  aclient->qentry_seqno = 0;
651 
652  if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
653  {
655  MQTTProperties* props = &no_props;
656  int temp_len = 0;
657  char* ptr = NULL;
658 
659  if (qe->msg->struct_version >= 1)
660  props = &qe->msg->properties;
661 
662  temp_len = MQTTProperties_len(props);
663  ptr = bufs[bufindex] = malloc(temp_len);
664  if (!ptr)
665  {
666  rc = PAHO_MEMORY_ERROR;
667  goto exit;
668  }
669  props_allocated = bufindex;
670  rc = MQTTProperties_write(&ptr, props);
671  lens[bufindex++] = temp_len;
672 
673  sprintf(key, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
674  }
675  else
676  sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
677 
678  qe->seqno = aclient->qentry_seqno;
679 
680  if (aclient->beforeWrite)
681  rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
682 
683  if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
684  Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
685 
686  if (props_allocated != 0)
687  free(bufs[props_allocated]);
688 
689 exit:
690  FUNC_EXIT_RC(rc);
691  return rc;
692 }
693 
694 
695 static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
696 {
697  MQTTPersistence_qEntry* qe = NULL;
698  char* ptr = buffer;
699  int data_size;
700 
701  FUNC_ENTRY;
702  if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
703  goto exit;
704  memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
705 
706  if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
707  {
708  free(qe);
709  qe = NULL;
710  goto exit;
711  }
712  memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
713 
714  qe->msg->struct_version = 1;
715 
716  qe->msg->payloadlen = *(int*)ptr;
717  ptr += sizeof(int);
718 
719  data_size = qe->msg->payloadlen;
720  if ((qe->msg->payload = malloc(data_size)) == NULL)
721  {
722  free(qe->msg);
723  free(qe);
724  qe = NULL;
725  goto exit;
726  }
727  memcpy(qe->msg->payload, ptr, data_size);
728  ptr += data_size;
729 
730  qe->msg->qos = *(int*)ptr;
731  ptr += sizeof(int);
732 
733  qe->msg->retained = *(int*)ptr;
734  ptr += sizeof(int);
735 
736  qe->msg->dup = *(int*)ptr;
737  ptr += sizeof(int);
738 
739  qe->msg->msgid = *(int*)ptr;
740  ptr += sizeof(int);
741 
742  data_size = (int)strlen(ptr) + 1;
743  if ((qe->topicName = malloc(data_size)) == NULL)
744  {
745  free(qe->msg->payload);
746  free(qe->msg);
747  free(qe);
748  qe = NULL;
749  goto exit;
750  }
751  strcpy(qe->topicName, ptr);
752  ptr += data_size;
753 
754  qe->topicLen = *(int*)ptr;
755  ptr += sizeof(int);
756 
757  if (MQTTVersion >= MQTTVERSION_5 &&
758  MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
759  Log(LOG_ERROR, -1, "Error restoring properties from persistence");
760 
761 exit:
762  FUNC_EXIT;
763  return qe;
764 }
765 
766 
768 {
769  ListElement* index = NULL;
770  ListElement* current = NULL;
771 
772  FUNC_ENTRY;
773  while (ListNextElement(list, &current) != NULL && index == NULL)
774  {
775  if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
776  index = current;
777  }
778  ListInsert(list, qEntry, size, index);
779  FUNC_EXIT;
780 }
781 
782 
789 {
790  int rc = 0;
791  char **msgkeys;
792  int nkeys;
793  int i = 0;
794  int entries_restored = 0;
795 
796  FUNC_ENTRY;
797  if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
798  {
799  while (rc == 0 && i < nkeys)
800  {
801  char *buffer = NULL;
802  int buflen;
803 
804  if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
805  strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
806  {
807  ; /* ignore if not a queue entry key */
808  }
809  else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
810  (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
811  {
812  int MQTTVersion =
813  (strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
815  MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
816 
817  if (qe)
818  {
819  qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
821  free(buffer);
822  c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
823  entries_restored++;
824  }
825  }
826  if (msgkeys[i])
827  {
828  free(msgkeys[i]);
829  }
830  i++;
831  }
832  if (msgkeys != NULL)
833  free(msgkeys);
834  }
835  Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
836  FUNC_EXIT_RC(rc);
837  return rc;
838 }
839 #endif
Persistence_containskey pcontainskey
#define PERSISTENCE_PUBLISH_SENT
List * messageQueue
Definition: Clients.h:137
Messages * MQTTProtocol_createMessage(Publish *publish, Messages **mm, int qos, int retained, int allocatePayload)
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
int pstopen(void **handle, const char *clientID, const char *serverURI, void *context)
char * topic
Definition: MQTTPacket.h:200
int MQTTProperties_len(MQTTProperties *props)
int MQTTPersistence_clear(Clients *c)
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
MQTTPersistence_beforeWrite * beforeWrite
Definition: Clients.h:141
#define FUNC_EXIT
Definition: StackTrace.h:59
MQTTProperties props
Definition: paho_c_pub.c:54
int msgId
Definition: MQTTPacket.h:217
int MQTTVersion
Definition: Clients.h:146
int msgId
Definition: MQTTPacket.h:202
bool retain
Definition: MQTTPacket.h:77
#define MQTTCLIENT_PERSISTENCE_ERROR
char byte
Definition: MQTTPacket.h:65
void *(* pf)(int, unsigned char, char *, size_t)
Definition: MQTTPacket.h:32
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
struct Header::@59 bits
#define MQTTCLIENT_PERSISTENCE_USER
List * outboundMsgs
Definition: Clients.h:136
#define PERSISTENCE_V5_PUBLISH_SENT
int pstclear(void *handle)
#define PERSISTENCE_V5_PUBLISH_RECEIVED
#define MAX_NO_OF_BUFFERS
#define malloc(x)
Definition: Heap.h:41
void MQTTPacket_freePublish(Publish *pack)
Definition: MQTTPacket.c:601
ClientStates * bstate
Definition: MQTTAsync.c:117
int len
Definition: Clients.h:63
struct ListElementStruct * next
Definition: LinkedList.h:58
#define PERSISTENCE_V5_COMMAND_KEY
START_TIME_TYPE lastTouch
Definition: Clients.h:61
#define free(x)
Definition: Heap.h:55
unsigned int type
Definition: MQTTPacket.h:80
#define PERSISTENCE_V5_QUEUE_KEY
void MQTTPersistence_insertInOrder(List *list, void *content, size_t size)
int clientSocketCompare(void *a, void *b)
Definition: Clients.c:50
void * beforeWrite_context
Definition: Clients.h:143
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
Definition: LinkedList.c:107
int MQTTPersistence_remove(Clients *c, char *type, int qos, int msgId)
constexpr size_t count()
Definition: core.h:960
#define PERSISTENCE_COMMAND_KEY
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
List * clients
Definition: Clients.h:163
int count
Definition: LinkedList.h:72
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
pf new_packets[]
Definition: MQTTPacket.c:73
static MQTTPersistence_qEntry * MQTTPersistence_restoreQueueEntry(char *buffer, size_t buflen, int MQTTVersion)
#define max(A, B)
Definition: Socket.h:88
MQTTClient_persistence * persistence
Definition: Clients.h:140
int pstcontainskey(void *handle, char *key)
#define MQTTVERSION_3_1_1
Definition: MQTTAsync.h:203
#define MAX_MSG_ID
Definition: MQTTProtocol.h:25
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
struct ListElementStruct * prev
Definition: LinkedList.h:58
int qos
Definition: test6.c:56
static void MQTTPersistence_insertInSeqOrder(List *list, MQTTPersistence_qEntry *qEntry, size_t size)
Definition: Log.h:41
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
#define PERSISTENCE_SEQNO_LIMIT
char nextMessageType
Definition: Clients.h:62
int MQTTPersistence_restoreMessageQueue(Clients *c)
unsigned int qentry_seqno
Definition: Clients.h:138
int persistence
Definition: test6.c:61
int MQTTPersistence_close(Clients *c)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int pstget(void *handle, char *key, char **buffer, int *buflen)
MQTTAsync client
Definition: test6.c:276
int MQTTVersion
Definition: MQTTPacket.h:219
int MQTTPersistence_restorePackets(Clients *c)
ListElement * last
Definition: LinkedList.h:69
#define FUNC_ENTRY
Definition: StackTrace.h:55
int pstclose(void *handle)
char * payload
Definition: Clients.h:44
void * afterRead_context
Definition: Clients.h:144
MQTTProperties properties
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
int pstput(void *handle, char *key, int bufcount, char *buffers[], int buflens[])
#define PERSISTENCE_MAX_KEY_LENGTH
int MQTTPersistence_unpersistQueueEntry(Clients *client, MQTTPersistence_qEntry *qe)
MQTTClient c
Definition: test10.c:1656
int MQTTPersistence_create(MQTTClient_persistence **persistence, int type, void *pcontext)
int MQTTPersistence_putPacket(int socket, char *buf0, size_t buf0len, int count, char **buffers, size_t *buflens, int htype, int msgId, int scr, int MQTTVersion)
const void * ptr(const T *p)
Definition: format.h:3610
char * topic
Definition: Clients.h:42
#define MQTTCLIENT_PERSISTENCE_NONE
void * phandle
Definition: Clients.h:139
void MQTTPersistence_wrapMsgID(Clients *client)
#define PERSISTENCE_QUEUE_KEY
#define MESSAGE_FILENAME_LENGTH
Header header
Definition: MQTTPacket.h:199
char * clientID
Definition: Clients.h:119
#define PERSISTENCE_PUBREL
int pstremove(void *handle, char *key)
A structure containing the function pointers to a persistence implementation and the context or state...
MQTTPersistence_afterRead * afterRead
Definition: Clients.h:142
int MQTTVersion
Definition: MQTTPacket.h:205
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
ListElement * first
Definition: LinkedList.h:69
#define PERSISTENCE_V5_PUBREL
List * inboundMsgs
Definition: Clients.h:135
Publications * publish
Definition: Clients.h:60
enum MQTTReasonCodes rc
Definition: test10.c:1112
#define PERSISTENCE_PUBLISH_RECEIVED
int MQTTPersistence_persistQueueEntry(Clients *aclient, MQTTPersistence_qEntry *qe)
MQTTPersistence_message * msg
unsigned int qos
Definition: MQTTPacket.h:78
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
void * MQTTPersistence_restorePacket(int MQTTVersion, char *buffer, size_t buflen)
int pstkeys(void *handle, char ***keys, int *nkeys)
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09