36 #if !defined(NO_PERSISTENCE) 44 #define min(A,B) ( (A) < (B) ? (A):(B)) 68 return msg->
msgid == *(
int*)b;
80 int start_msgid = client->
msgID;
81 int msgid = start_msgid;
88 if (msgid == start_msgid)
95 client->
msgID = msgid;
174 p.
payload = (*mm)->publish->payload;
175 p.
topic = (*mm)->publish->topic;
181 memcpy((*mm)->publish->mask, p.
mask,
sizeof((*mm)->publish->mask));
204 if (*mm == NULL || (*mm)->
publish == NULL)
228 ++(((*mm)->publish)->refcount);
260 *len = (int)strlen(publish->
topic)+1;
262 publish->
topic = NULL;
333 int already_received = 0;
360 already_received = 1;
393 publish->
topic = NULL;
428 #if !defined(NO_PERSISTENCE) 487 Log(
TRACE_MIN, -1,
"Pubrec error %d received for client %s msgid %d, not sending PUBREL",
489 #if !defined(NO_PERSISTENCE) 556 memset(&publish,
'\0',
sizeof(publish));
574 #if !defined(NO_PERSISTENCE) 629 #if !defined(NO_PERSISTENCE) 634 Log(
LOG_ERROR, -1,
"Error removing PUBCOMP for client id %s msgid %d from persistence", client->
clientID, pubcomp->
msgId);
793 if (client->
good == 0)
834 if (client->sslopts->trustStore)
835 free((
void*)client->sslopts->trustStore);
836 if (client->sslopts->keyStore)
837 free((
void*)client->sslopts->keyStore);
838 if (client->sslopts->privateKey)
839 free((
void*)client->sslopts->privateKey);
840 if (client->sslopts->privateKeyPassword)
841 free((
void*)client->sslopts->privateKeyPassword);
842 if (client->sslopts->enabledCipherSuites)
843 free((
void*)client->sslopts->enabledCipherSuites);
844 if (client->sslopts->struct_version >= 2)
846 if (client->sslopts->CApath)
847 free((
void*)client->sslopts->CApath);
849 free(client->sslopts);
850 client->sslopts = NULL;
902 size_t count = dest_size;
906 if (dest_size < strlen(src))
910 while (count > 1 && (*temp++ = *src++))
927 size_t mlen = strlen(src) + 1;
928 char* temp =
malloc(mlen);
#define PERSISTENCE_PUBLISH_SENT
void MQTTProtocol_removePublication(Publications *p)
int MQTTProtocol_handlePubacks(void *pack, int sock)
Messages * MQTTProtocol_createMessage(Publish *publish, Messages **mm, int qos, int retained, int allocatePayload)
void MQTTProtocol_freeClient(Clients *client)
void MQTTProtocol_freeMessageList(List *msgList)
int MQTTProtocol_handlePubcomps(void *pack, int sock)
unsigned int msgs_received
DIFF_TIME_TYPE MQTTTime_difftime(START_TIME_TYPE new, START_TIME_TYPE old)
void MQTTProperties_free(MQTTProperties *props)
pending_writes * SocketBuffer_updateWrite(int socket, char *topic, char *payload)
int Socket_noPendingWrites(int socket)
static int MQTTProtocol_startPublishCommon(Clients *pubclient, Publish *publish, int qos, int retained)
int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
#define PAHO_MEMORY_ERROR
int MQTTProtocol_handlePublishes(void *pack, int sock)
MQTTProperties properties
#define PERSISTENCE_V5_PUBLISH_SENT
#define PERSISTENCE_V5_PUBLISH_RECEIVED
void MQTTPacket_freePublish(Publish *pack)
int ListRemove(List *aList, void *content)
START_TIME_TYPE lastTouch
void Protocol_processPublication(Publish *publish, Clients *client, int allocatePayload)
int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
int clientSocketCompare(void *a, void *b)
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
int MQTTPersistence_remove(Clients *c, char *type, int qos, int msgId)
void ListEmpty(List *aList)
MQTTProperties MQTTProperties_copy(const MQTTProperties *props)
char * MQTTStrncpy(char *dest, const char *src, size_t dest_size)
static void MQTTProtocol_retries(START_TIME_TYPE now, Clients *client, int regardless)
ListElement * ListNextElement(List *aList, ListElement **pos)
void MQTTProtocol_keepalive(START_TIME_TYPE now)
START_TIME_TYPE lastReceived
static void MQTTProtocol_storeQoS0(Clients *pubclient, Publish *publish)
int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles *net, const char *clientID)
int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles *net, const char *clientID)
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
MQTTProperties properties
ListElement * ListAppend(List *aList, void *content, size_t size)
void MQTTProtocol_closeSession(Clients *c, int sendwill)
unsigned int ping_outstanding
char * MQTTStrdup(const char *src)
char * Socket_getpeer(int sock)
int messageIDCompare(void *a, void *b)
void MQTTProtocol_emptyMessageList(List *msgList)
START_TIME_TYPE MQTTTime_now(void)
int MQTTProtocol_handlePubrels(void *pack, int sock)
int MQTTPacket_send_pingreq(networkHandles *net, const char *clientID)
int MQTTProtocol_handlePubrecs(void *pack, int sock)
#define TCPSOCKET_COMPLETE
int MQTTProtocol_startPublish(Clients *pubclient, Publish *publish, int qos, int retained, Messages **mm)
MQTTProperties properties
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
void ListFree(List *aList)
#define PERSISTENCE_PUBLISH_RECEIVED
int MQTTPacket_send_publish(Publish *pack, int dup, int qos, int retained, networkHandles *net, const char *clientID)
Publications * MQTTProtocol_storePublication(Publish *publish, int *len)
#define TCPSOCKET_INTERRUPTED
int MQTTProtocol_assignMsgId(Clients *client)