53 #if !defined(NO_PERSISTENCE) 63 if ( pcontext == NULL )
87 if ( per == NULL || (per != NULL && (per->
context == NULL || per->
pclear == NULL ||
138 #if !defined(NO_PERSISTENCE) 183 char **msgkeys = NULL,
193 while (rc == 0 && i < nkeys)
209 char* cur_key = msgkeys[i];
256 publish->
topic = NULL;
285 publish->
topic = NULL;
327 Log(
TRACE_MINIMUM, -1,
"%d sent messages and %d received messages restored for client %s\n",
345 int fixed_header_length = 1, ptype, remaining_length = 0;
351 header.
byte = buffer[0];
356 remaining_length += (c & 127) * multiplier;
358 fixed_header_length++;
359 }
while ((c & 128) != 0);
361 if ( (fixed_header_length + remaining_length) == buflen )
365 pack = (*new_packets[ptype])(MQTTVersion, header.
byte, ++buffer, remaining_length);
413 char** buffers,
size_t* buflens,
int htype,
int msgId,
int scr,
int MQTTVersion)
433 if ((lens = (
int *)
malloc(nbufs *
sizeof(
int))) == NULL)
439 if ((bufs = (
char **)
malloc(nbufs *
sizeof(
char *))) == NULL)
446 lens[0] = (int)buf0len;
448 for (i = 0; i <
count; i++)
450 lens[i+1] = (int)buflens[i];
451 bufs[i+1] = buffers[i];
471 sprintf(key,
"%s%d", key_id, msgId);
479 sprintf(key,
"%s%d", key_id, msgId);
566 int gap =
MAX_MSG_ID - lastMsgID + firstMsgID;
573 int curgap = curMsgID - curPrevMsgID;
582 if ( wrapel != NULL )
596 #if !defined(NO_PERSISTENCE) 608 Log(
LOG_ERROR, 0,
"Error %d removing qEntry from persistence", rc);
614 #define MAX_NO_OF_BUFFERS 9 622 int props_allocated = 0;
631 bufs[bufindex] = &qe->
msg->
qos;
632 lens[bufindex++] =
sizeof(qe->
msg->
qos);
637 bufs[bufindex] = &qe->
msg->
dup;
638 lens[bufindex++] =
sizeof(qe->
msg->
dup);
641 lens[bufindex++] =
sizeof(qe->
msg->
msgid);
644 lens[bufindex++] = (int)strlen(qe->
topicName) + 1;
647 lens[bufindex++] =
sizeof(qe->
topicLen);
663 ptr = bufs[bufindex] =
malloc(temp_len);
669 props_allocated = bufindex;
671 lens[bufindex++] = temp_len;
683 if (rc == 0 && (rc = aclient->
persistence->
pput(aclient->
phandle, key, bufindex, (
char**)bufs, lens)) != 0)
684 Log(
LOG_ERROR, 0,
"Error persisting queue entry, rc %d", rc);
686 if (props_allocated != 0)
687 free(bufs[props_allocated]);
730 qe->
msg->
qos = *(
int*)ptr;
736 qe->
msg->
dup = *(
int*)ptr;
742 data_size = (int)strlen(ptr) + 1;
759 Log(
LOG_ERROR, -1,
"Error restoring properties from persistence");
794 int entries_restored = 0;
799 while (rc == 0 && i < nkeys)
819 qe->
seqno = atoi(strchr(msgkeys[i],
'-')+1);
Persistence_containskey pcontainskey
#define PERSISTENCE_PUBLISH_SENT
Messages * MQTTProtocol_createMessage(Publish *publish, Messages **mm, int qos, int retained, int allocatePayload)
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
int pstopen(void **handle, const char *clientID, const char *serverURI, void *context)
int MQTTProperties_len(MQTTProperties *props)
int MQTTPersistence_clear(Clients *c)
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
MQTTPersistence_beforeWrite * beforeWrite
#define MQTTCLIENT_PERSISTENCE_ERROR
void *(* pf)(int, unsigned char, char *, size_t)
#define PAHO_MEMORY_ERROR
#define MQTTCLIENT_PERSISTENCE_USER
#define PERSISTENCE_V5_PUBLISH_SENT
int pstclear(void *handle)
#define PERSISTENCE_V5_PUBLISH_RECEIVED
#define MAX_NO_OF_BUFFERS
void MQTTPacket_freePublish(Publish *pack)
struct ListElementStruct * next
#define PERSISTENCE_V5_COMMAND_KEY
START_TIME_TYPE lastTouch
#define PERSISTENCE_V5_QUEUE_KEY
void MQTTPersistence_insertInOrder(List *list, void *content, size_t size)
int clientSocketCompare(void *a, void *b)
void * beforeWrite_context
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
int MQTTPersistence_remove(Clients *c, char *type, int qos, int msgId)
#define PERSISTENCE_COMMAND_KEY
ListElement * ListNextElement(List *aList, ListElement **pos)
static MQTTPersistence_qEntry * MQTTPersistence_restoreQueueEntry(char *buffer, size_t buflen, int MQTTVersion)
MQTTClient_persistence * persistence
int pstcontainskey(void *handle, char *key)
#define MQTTVERSION_3_1_1
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
struct ListElementStruct * prev
static void MQTTPersistence_insertInSeqOrder(List *list, MQTTPersistence_qEntry *qEntry, size_t size)
ListElement * ListAppend(List *aList, void *content, size_t size)
#define PERSISTENCE_SEQNO_LIMIT
int MQTTPersistence_restoreMessageQueue(Clients *c)
unsigned int qentry_seqno
int MQTTPersistence_close(Clients *c)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int pstget(void *handle, char *key, char **buffer, int *buflen)
int MQTTPersistence_restorePackets(Clients *c)
int pstclose(void *handle)
MQTTProperties properties
Persistence_remove premove
int pstput(void *handle, char *key, int bufcount, char *buffers[], int buflens[])
#define PERSISTENCE_MAX_KEY_LENGTH
int MQTTPersistence_unpersistQueueEntry(Clients *client, MQTTPersistence_qEntry *qe)
int MQTTPersistence_create(MQTTClient_persistence **persistence, int type, void *pcontext)
int MQTTPersistence_putPacket(int socket, char *buf0, size_t buf0len, int count, char **buffers, size_t *buflens, int htype, int msgId, int scr, int MQTTVersion)
#define MQTTCLIENT_PERSISTENCE_NONE
void MQTTPersistence_wrapMsgID(Clients *client)
#define PERSISTENCE_QUEUE_KEY
#define MESSAGE_FILENAME_LENGTH
#define PERSISTENCE_PUBREL
int pstremove(void *handle, char *key)
A structure containing the function pointers to a persistence implementation and the context or state...
MQTTPersistence_afterRead * afterRead
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
#define PERSISTENCE_V5_PUBREL
#define PERSISTENCE_PUBLISH_RECEIVED
int MQTTPersistence_persistQueueEntry(Clients *aclient, MQTTPersistence_qEntry *qe)
MQTTPersistence_message * msg
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
void * MQTTPersistence_restorePacket(int MQTTVersion, char *buffer, size_t buflen)
int pstkeys(void *handle, char ***keys, int *nkeys)
#define MQTTProperties_initializer