49 #if !defined(_WIN32) && !defined(_WIN64) 53 #if !defined(NO_PERSISTENCE) 67 #define URI_TCP "tcp://" 68 #define URI_WS "ws://" 69 #define URI_WSS "wss://" 71 #include "VersionInfo.h" 88 #define min(a, b) (((a) < (b)) ? (a) : (b)) 91 #if defined(WIN32) || defined(WIN64) 131 #if defined(_WIN32) || defined(_WIN64) 136 #if !defined(NO_HEAP_TRACKING) 151 printf(
"mqttasync_mutex error %d\n", rc);
157 printf(
"mqttcommand_mutex error %d\n", rc);
160 if ((send_sem = CreateEvent(
168 printf(
"send_sem error %d\n", rc);
171 #if !defined(NO_HEAP_TRACKING) 172 if ((
stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
175 printf(
"stack_mutex error %d\n", rc);
178 if ((
heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
181 printf(
"heap_mutex error %d\n", rc);
185 if ((
log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
188 printf(
"log_mutex error %d\n", rc);
191 if ((
socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
194 printf(
"socket_mutex error %d\n", rc);
206 void MQTTAsync_cleanup(
void)
209 CloseHandle(send_sem);
210 #if !defined(NO_HEAP_TRACKING) 224 #if defined(PAHO_MQTT_STATIC) 225 static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT;
228 BOOL CALLBACK InitMutexesOnce (
237 BOOL APIENTRY DllMain(HANDLE hModule,
238 DWORD ul_reason_for_call,
241 switch (ul_reason_for_call)
243 case DLL_PROCESS_ATTACH:
246 case DLL_THREAD_ATTACH:
248 case DLL_THREAD_DETACH:
250 case DLL_PROCESS_DETACH:
275 pthread_mutexattr_t attr;
278 pthread_mutexattr_init(&attr);
279 #if !defined(_WRS_KERNEL) 280 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
285 printf(
"MQTTAsync: error %d initializing async_mutex\n", rc);
287 printf(
"MQTTAsync: error %d initializing command_mutex\n", rc);
288 else if ((rc = pthread_mutex_init(
socket_mutex, &attr)) != 0)
289 printf(
"MQTTClient: error %d initializing socket_mutex\n", rc);
290 else if ((rc = pthread_cond_init(&send_cond->
cond, NULL)) != 0)
291 printf(
"MQTTAsync: error %d initializing send_cond cond\n", rc);
292 else if ((rc = pthread_mutex_init(&send_cond->
mutex, &attr)) != 0)
293 printf(
"MQTTAsync: error %d initializing send_cond mutex\n", rc);
439 #if !defined(NO_PERSISTENCE) 483 #if defined(_WIN32) || defined(_WIN64) 486 usleep(milliseconds*1000);
498 const int max_sleep = (int)(
min(maxInterval, currentIntervalBase) * 1.2);
499 const int min_sleep = (int)(
max(minInterval, currentIntervalBase) / 1.2);
501 if (min_sleep >= max_sleep)
510 int range = max_sleep - min_sleep + 1;
511 const int buckets = RAND_MAX / range;
512 const int limit = buckets * range;
520 }
while (r >= limit);
523 const int randResult = r / buckets;
524 return min_sleep + randResult;
547 Log(
LOG_ERROR, 0,
"Error %s locking mutex", strerror(rc));
555 Log(
LOG_ERROR, 0,
"Error %s unlocking mutex", strerror(rc));
581 #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC) 583 BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitMutexesOnce, NULL, NULL);
588 if (serverURI == NULL || clientId == NULL)
606 if (strstr(serverURI,
"://") != NULL)
621 if (options && (strncmp(options->
struct_id,
"MQCO", 4) != 0 ||
630 #if !defined(NO_HEAP_TRACKING) 653 else if (strncmp(
URI_WS, serverURI, strlen(
URI_WS)) == 0)
655 serverURI += strlen(
URI_WS);
684 memset(m->
c,
'\0',
sizeof(
Clients));
711 #if !defined(NO_PERSISTENCE) 738 int persistence_type,
void* persistence_context)
743 persistence_context, NULL);
761 #if !defined(NO_HEAP_TRACKING) 771 #if !defined(NO_PERSISTENCE) 783 Log(
LOG_ERROR, 0,
"Error %d removing command from persistence", rc);
796 int bufindex = 0, i, nbufs = 0;
798 int props_allocated = 0;
802 switch (command->
type)
808 if (((lens = (
int*)
malloc(nbufs *
sizeof(
int))) == NULL) ||
809 ((bufs =
malloc(nbufs *
sizeof(
char *))) == NULL))
814 bufs[bufindex] = &command->
type;
815 lens[bufindex++] =
sizeof(command->
type);
817 bufs[bufindex] = &command->
token;
818 lens[bufindex++] =
sizeof(command->
token);
821 lens[bufindex++] =
sizeof(command->
details.
sub.count);
823 for (i = 0; i < command->
details.
sub.count; ++i)
825 bufs[bufindex] = command->
details.
sub.topics[i];
826 lens[bufindex++] = (int)strlen(command->
details.
sub.topics[i]) + 1;
830 bufs[bufindex] = &command->
details.
sub.qoss[i];
831 lens[bufindex++] =
sizeof(command->
details.
sub.qoss[i]);
838 lens[bufindex++] =
sizeof(command->
details.
sub.opts);
842 bufs[bufindex] = &command->
details.
sub.optlist[i];
843 lens[bufindex++] =
sizeof(command->
details.
sub.optlist[i]);
853 if (((lens = (
int*)
malloc(nbufs *
sizeof(
int))) == NULL) ||
854 ((bufs =
malloc(nbufs *
sizeof(
char *))) == NULL))
860 bufs[bufindex] = &command->
type;
861 lens[bufindex++] =
sizeof(command->
type);
863 bufs[bufindex] = &command->
token;
864 lens[bufindex++] =
sizeof(command->
token);
867 lens[bufindex++] =
sizeof(command->
details.
unsub.count);
872 lens[bufindex++] = (int)strlen(command->
details.
unsub.topics[i]) + 1;
879 if (((lens = (
int*)
malloc(nbufs *
sizeof(
int))) == NULL) ||
880 ((bufs =
malloc(nbufs *
sizeof(
char *))) == NULL))
886 bufs[bufindex] = &command->
type;
887 lens[bufindex++] =
sizeof(command->
type);
889 bufs[bufindex] = &command->
token;
890 lens[bufindex++] =
sizeof(command->
token);
892 bufs[bufindex] = command->
details.
pub.destinationName;
893 lens[bufindex++] = (int)strlen(command->
details.
pub.destinationName) + 1;
895 bufs[bufindex] = &command->
details.
pub.payloadlen;
896 lens[bufindex++] =
sizeof(command->
details.
pub.payloadlen);
898 bufs[bufindex] = command->
details.
pub.payload;
899 lens[bufindex++] = command->
details.
pub.payloadlen;
902 lens[bufindex++] =
sizeof(command->
details.
pub.qos);
904 bufs[bufindex] = &command->
details.
pub.retained;
905 lens[bufindex++] =
sizeof(command->
details.
pub.retained);
927 if ((ptr = bufs[bufindex] =
malloc(temp_len)) == NULL)
932 props_allocated = bufindex;
934 lens[bufindex++] = temp_len;
946 Log(
LOG_ERROR, 0,
"Error persisting command, rc %d", rc);
950 if (props_allocated > 0)
951 free(bufs[props_allocated]);
969 if (qcommand == NULL)
981 command->
type = *(
int*)ptr;
987 switch (command->
type)
1024 for (i = 0; i < command->
details.
sub.count; ++i)
1026 data_size = strlen(ptr) + 1;
1077 data_size = strlen(ptr) + 1;
1091 data_size = strlen(ptr) + 1;
1100 strcpy(command->
details.
pub.destinationName, ptr);
1104 command->
details.
pub.payloadlen = *(
int*)ptr;
1116 memcpy(command->
details.
pub.payload, ptr, data_size);
1135 Log(
LOG_ERROR, -1,
"Error restoring properties from persistence");
1171 if (array_size > 0 && seqno > keyloc_array[array_size - 1].seqno)
1173 low = array_size - 1;
1175 else if (array_size > 0)
1177 int high = array_size - 1;
1178 int divide_index = array_size / 2;
1182 while (high - low > 1)
1184 if (seqno < keyloc_array[divide_index].seqno)
1185 high = divide_index;
1189 divide_index = (high - low) / 2 + low;
1197 insert_point = keyloc_array[low].
elem;
1199 insert_point = insert_point->
next;
1203 inserted =
ListInsert(list, content, size, insert_point);
1205 if (array_size > 0 && low + 1 < array_size)
1209 memmove(&keyloc_array[low+2], &keyloc_array[low+1], (array_size - low - 1) *
sizeof(
struct keyloc));
1211 keyloc_array[low+1].
seqno = seqno;
1212 keyloc_array[low+1].
elem = inserted;
1220 static int cmpkeys(
const void *p1,
const void *p2)
1222 int key1 = atoi(strchr(*(
char *
const *)p1,
'-') + 1);
1223 int key2 = atoi(strchr(*(
char *
const *)p2,
'-') + 1);
1225 return (key1 == key2) ? 0 : ((key1 < key2) ? -1 : 1);
1236 int commands_restored = 0;
1246 qsort(msgkeys, (
size_t)nkeys,
sizeof(
char*),
cmpkeys);
1248 if (keyloc_array == NULL)
1254 if (sentinel == NULL)
1260 sentinel->
seqno = -1;
1261 keyloc_array[0].
seqno = -1;
1264 while (rc == 0 && i < nkeys)
1266 char *buffer = NULL;
1286 cmd->
key =
malloc(strlen(msgkeys[i])+1);
1287 strcpy(cmd->
key, msgkeys[i]);
1293 cmd->
seqno = atoi(strchr(msgkeys[i],
'-')+1);
1298 commands_restored++;
1307 if (msgkeys != NULL)
1329 int messages_deleted = 0;
1334 while (rc == 0 && i < nkeys)
1344 Log(
LOG_ERROR, 0,
"Error %d removing queued message from persistence", rc);
1350 if (msgkeys != NULL)
1365 int messages_deleted = 0;
1370 while (rc == 0 && i < nkeys)
1382 Log(
LOG_ERROR, 0,
"Error %d removing inflight message from persistence", rc);
1388 if (msgkeys != NULL)
1434 if (commands->
first)
1448 #if !defined(NO_PERSISTENCE) 1468 strcpy(command->
key, key);
1494 first_publish = cmd;
1503 #if !defined(NO_PERSISTENCE) 1514 #if !defined(_WIN32) && !defined(_WIN64) 1604 if (m->
cl && was_connected)
1615 memset(&data,
'\0',
sizeof(data));
1753 cur_response = NULL;
1765 if (rc == 1 && command->
details.
pub.qos == 0)
1846 List* ignored_clients = NULL;
1878 Log(
TRACE_MIN, -1,
"Blocking on server receive maximum for client %s",
1895 #if !defined(NO_PERSISTENCE) 1902 char* buffer = NULL;
1912 command->
key = NULL;
1918 Log(
LOG_ERROR, -1,
"Error restoring command: rc %d from pget\n", rc);
1945 else if (strncmp(
URI_WS, serverURI, strlen(
URI_WS)) == 0)
1947 serverURI += strlen(
URI_WS);
1950 #if defined(OPENSSL) 1977 #if defined(OPENSSL) 1978 #if defined(__GNUC__) && defined(__linux__) 1986 #if defined(__GNUC__) && defined(__linux__) 2002 if (rc == EINPROGRESS)
2229 rc = (command != NULL);
2352 if (callback_rc == 1)
2381 Log(
TRACE_MIN, -1,
"Automatically attempting to reconnect");
2404 while (commands->
count > 0)
2409 #if !defined(_WIN32) && !defined(_WIN64) 2411 Log(
LOG_ERROR, -1,
"Error %d waiting for condition variable", rc);
2414 Log(
LOG_ERROR, -1,
"Error %d waiting for semaphore", rc);
2425 #if defined(_WIN32) || defined(_WIN64) 2514 if (command->
client == m)
2574 #if !defined(NO_PERSISTENCE) 2583 free(saved_clientid);
2619 free((*message)->payload);
2665 Log(
LOG_PROTOCOL, -1,
"Cleaning session state on connect because sessionPresent is 0");
2684 #if !defined(_WIN32) && !defined(_WIN64) 2723 Log(
TRACE_MINIMUM, -1,
"Could not find client corresponding to socket %d", sock);
2730 Log(
LOG_ERROR, -1,
"Client structure was NULL for socket %d - removing socket", sock);
2736 Log(
TRACE_MINIMUM, -1,
"Error from MQTTAsync_cycle() - removing socket %d", sock);
2753 #if !defined(NO_PERSISTENCE) 2760 Log(
TRACE_MIN, -1,
"False returned from messageArrived for client %s, message remains on queue",
2768 int sessionPresent = connack->
flags.
bits.sessionPresent;
2785 memset(&data,
'\0',
sizeof(data));
2793 data.
alt.
connect.sessionPresent = sessionPresent;
2806 data.
alt.
connect.sessionPresent = sessionPresent;
2814 char* reason = (
onSuccess) ?
"connect onSuccess called" :
"automatic reconnect";
2846 Log(
LOG_ERROR, -1,
"Subscribe command not removed from command list");
2882 *element++ = *(
int*)(cur_qos->
content);
2918 *element++ = *(
int*)(cur_qos->
content);
2944 Log(
LOG_ERROR, -1,
"Unsubscribe command not removed from command list");
2952 memset(&data,
'\0',
sizeof(data));
3002 #if !defined(_WIN32) && !defined(_WIN64) 3010 #if defined(_WIN32) || defined(_WIN64) 3019 #if !defined(NOSTACKTRACE) 3029 if (handles != NULL)
3041 if (conn_count == 0)
3052 #if !defined(NOSTACKTRACE) 3278 #if defined(OPENSSL) 3279 SSL_SESSION_free(client->session);
3280 client->session = NULL;
3285 #if defined(OPENSSL) 3286 client->
net.ssl = NULL;
3334 #if !defined(NO_PERSISTENCE) 3346 Log(
LOG_ERROR, -1,
"cleanSession: did not find client structure in handles list");
3385 Log(
TRACE_MIN, -1,
"Calling messageArrived for client %s, queue depth %d",
3387 rc = (*(m->
ma))(m->
maContext, topicName, (int)topicLen, mm);
3407 if (allocatePayload)
3434 Log(
LOG_ERROR, -1,
"processPublication: did not find client structure in handles list");
3454 #if !defined(NO_PERSISTENCE) 3460 publish->
topic = NULL;
3469 int proposed = keepalive / 10;
3473 else if (proposed > 5)
3489 if (options == NULL)
3501 #if defined(OPENSSL) 3502 if (m->
ssl && options->
ssl == NULL)
3631 const void* source = NULL;
3667 #if defined(OPENSSL) 3670 if (m->
c->sslopts->trustStore)
3671 free((
void*)m->
c->sslopts->trustStore);
3672 if (m->
c->sslopts->keyStore)
3673 free((
void*)m->
c->sslopts->keyStore);
3674 if (m->
c->sslopts->privateKey)
3675 free((
void*)m->
c->sslopts->privateKey);
3676 if (m->
c->sslopts->privateKeyPassword)
3677 free((
void*)m->
c->sslopts->privateKeyPassword);
3678 if (m->
c->sslopts->enabledCipherSuites)
3679 free((
void*)m->
c->sslopts->enabledCipherSuites);
3680 if (m->
c->sslopts->struct_version >= 2)
3682 if (m->
c->sslopts->CApath)
3683 free((
void*)m->
c->sslopts->CApath);
3685 free((
void*)m->
c->sslopts);
3686 m->
c->sslopts = NULL;
3709 if (m->
c->sslopts->struct_version >= 1)
3711 if (m->
c->sslopts->struct_version >= 2)
3713 m->
c->sslopts->verify = options->
ssl->
verify;
3717 if (m->
c->sslopts->struct_version >= 3)
3722 if (m->
c->sslopts->struct_version >= 4)
3728 if (m->
c->sslopts->struct_version >= 5)
3730 m->
c->sslopts->protos = options->
ssl->
protos;
3863 if (m == NULL || m->
c == NULL)
3978 start_msgid = m->
c->
msgID;
3979 msgid = start_msgid;
3982 msgid = (msgid ==
MAX_MSG_ID) ? 1 : msgid + 1;
3987 msgid = (msgid ==
MAX_MSG_ID) ? 1 : msgid + 1;
3988 if (msgid == start_msgid)
4013 if (m == NULL || m->
c == NULL)
4017 else for (i = 0; i <
count; i++)
4024 if (qos[i] < 0 || qos[i] > 2)
4084 for (i = 0; i <
count; ++i)
4089 for (i = 0; i <
count; ++i)
4101 for (i = 0; i <
count; ++i)
4140 if (m == NULL || m->
c == NULL)
4144 else for (i = 0; i <
count; i++)
4199 for (i = 0; i <
count; ++i)
4239 if (m == NULL || m->
c == NULL)
4256 else if (qos < 0 || qos > 2)
4335 if (message == NULL)
4340 if (strncmp(message->
struct_id,
"MQTM", 4) != 0 ||
4381 #if defined(OPENSSL) 4393 else if (strncmp(
URI_WS, serverURI, strlen(
URI_WS)) == 0)
4395 serverURI += strlen(
URI_WS);
4396 #if defined(OPENSSL) 4400 #if defined(OPENSSL) 4419 if ((rc = getsockopt(m->
c->
net.
socket, SOL_SOCKET, SO_ERROR, (
char*)&error, &len)) == 0)
4427 #if defined(OPENSSL) 4431 size_t hostname_len;
4432 int setSocketForSSLrc = 0;
4442 serverURI, hostname_len);
4446 if (m->
c->session != NULL)
4447 if ((rc = SSL_set_session(m->
c->
net.ssl, m->
c->session)) != 1)
4448 Log(
TRACE_MIN, -1,
"Failed to set SSL session with stored data, non critical");
4449 rc = m->
c->sslopts->struct_version >= 3 ?
4451 m->
c->sslopts->verify, m->
c->sslopts->ssl_error_cb, m->
c->sslopts->ssl_error_context) :
4453 m->
c->sslopts->verify, NULL, NULL);
4484 m->
c->session = SSL_get1_session(m->
c->
net.ssl);
4515 #if defined(OPENSSL) 4519 #if defined(OPENSSL) 4522 rc = m->
c->sslopts->struct_version >= 3 ?
4524 m->
c->sslopts->verify, m->
c->sslopts->ssl_error_cb, m->
c->sslopts->ssl_error_context) :
4526 m->
c->sslopts->verify, NULL, NULL);
4531 m->
c->session = SSL_get1_session(m->
c->
net.ssl);
4571 struct timeval tp = {0L, 0L};
4577 tp.tv_sec = timeout / 1000;
4578 tp.tv_usec = (timeout % 1000) * 1000;
4581 #if defined(OPENSSL) 4587 if (!
tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
4589 #if defined(OPENSSL) 4607 Log(
TRACE_MINIMUM, -1,
"CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
4649 Log(
LOG_ERROR, -1,
"PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
4666 Log(
LOG_ERROR, -1,
"Publish command not removed from command list");
4791 (*tokens)[count++] = m->
msgid;
4794 (*tokens)[
count] = -1;
4858 if (m == NULL || m->
c == NULL)
4911 #define MAX_INFO_STRINGS 8 4915 libinfo[i].
name =
"Product name";
4916 libinfo[i++].
value =
"Eclipse Paho Asynchronous MQTT C Client Library";
4918 libinfo[i].
name =
"Version";
4919 libinfo[i++].
value = CLIENT_VERSION;
4921 libinfo[i].
name =
"Build level";
4922 libinfo[i++].
value = BUILD_TIMESTAMP;
4923 #if defined(OPENSSL) 4924 libinfo[i].
name =
"OpenSSL version";
4925 libinfo[i++].
value = SSLeay_version(SSLEAY_VERSION);
4927 libinfo[i].
name =
"OpenSSL flags";
4928 libinfo[i++].
value = SSLeay_version(SSLEAY_CFLAGS);
4930 libinfo[i].
name =
"OpenSSL build timestamp";
4931 libinfo[i++].
value = SSLeay_version(SSLEAY_BUILT_ON);
4933 libinfo[i].
name =
"OpenSSL platform";
4934 libinfo[i++].
value = SSLeay_version(SSLEAY_PLATFORM);
4936 libinfo[i].
name =
"OpenSSL directory";
4937 libinfo[i++].
value = SSLeay_version(SSLEAY_DIR);
4939 libinfo[i].
name = NULL;
4940 libinfo[i].
value = NULL;
4946 static char buf[30];
4954 return "Persistence error";
4956 return "Disconnected";
4958 return "Maximum in-flight messages amount reached";
4960 return "Invalid UTF8 string";
4962 return "Invalid (NULL) parameter";
4964 return "Topic containing NULL characters has been truncated";
4966 return "Bad structure";
4968 return "Invalid QoS value";
4970 return "Too many pending commands";
4972 return "Operation discarded before completion";
4974 return "No more messages can be buffered";
4976 return "SSL is not supported";
4978 return "Invalid protocol scheme";
4980 return "Options for wrong MQTT version";
4982 return "Client created for another version of MQTT";
4984 return "Zero length will topic on connect";
4987 sprintf(buf,
"Unknown error code %d", code);
static void MQTTAsync_unlock_mutex(mutex_type amutex)
static int clientCompareConnectCommand(void *a, void *b)
static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command *command)
MQTTAsync_onFailure * onFailure
int Thread_post_sem(sem_type sem)
MQTTProperties properties
int WebSocket_proxy_connect(networkHandles *net, int ssl, const char *hostname)
#define PERSISTENCE_PUBLISH_SENT
void MQTTProtocol_removePublication(Publications *p)
MQTTProperties properties
int MQTTProtocol_handlePubacks(void *pack, int sock)
#define PROXY_CONNECT_IN_PROGRESS
START_TIME_TYPE lastConnectionFailedTime
const char * client_timestamp_eye
int Thread_wait_cond(cond_type condvar, int timeout)
void MQTTPacket_freeAck(Ack *pack)
int MQTTProperties_read(MQTTProperties *properties, char **pptr, char *enddata)
static pthread_mutex_t mqttcommand_mutex_store
MQTTProperties properties
#define MQTTAsync_successData5_initializer
void onSuccess(void *context, MQTTAsync_successData *response)
unsigned int command_seqno
#define MQTTASYNC_NULL_PARAMETER
static mutex_type mqttcommand_mutex
struct MQTTAsync_successData5::@49::@52 connect
int MQTTProperties_len(MQTTProperties *props)
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
#define MQTTASYNC_SSL_NOT_SUPPORTED
#define WebSocket_CLOSE_NORMAL
void MQTTAsync_traceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
void WebSocket_close(networkHandles *net, int status_code, const char *reason)
MQTTAsync_onFailure5 * onFailure5
int MQTTProperties_write(char **pptr, const MQTTProperties *properties)
void MQTTAsync_init_rand(void)
#define MQTTCLIENT_SUCCESS
void MQTTProtocol_freeClient(Clients *client)
MQTTPersistence_beforeWrite * beforeWrite
static cond_type_struct send_cond_store
static thread_return_type WINAPI MQTTAsync_receiveThread(void *n)
#define MQTT_DEFAULT_PORT
static mutex_type heap_mutex
int UTF8_validateString(const char *string)
START_TIME_TYPE start_time
int MQTTProtocol_handlePubcomps(void *pack, int sock)
union MQTTAsync_successData::@46 alt
int SSLSocket_setSocketForSSL(networkHandles *net, MQTTClient_SSLOptions *opts, const char *hostname, size_t hostname_len)
void WebSocket_terminate(void)
void Log_setTraceLevel(enum LOG_LEVELS level)
int MQTTProtocol_connect(const char *ip_address, Clients *aClient, int websocket, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl)
DIFF_TIME_TYPE MQTTTime_difftime(START_TIME_TYPE new, START_TIME_TYPE old)
int sendWhileDisconnected
static void MQTTAsync_freeServerURIs(MQTTAsyncs *m)
void MQTTProperties_free(MQTTProperties *props)
MQTTProperties properties
#define MQTTASYNC_FAILURE
void MQTTPacket_freeConnack(Connack *pack)
int Socket_noPendingWrites(int socket)
struct Connack::@62::@63 bits
int disableDefaultTrustStore
MQTTProperties properties
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
void * MQTTAsync_malloc(size_t size)
#define PAHO_MEMORY_ERROR
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
const MQTTClient_nameValue * httpHeaders
struct MQTTAsync_successData5::@49::@53 unsub
#define MQTTASYNC_BAD_UTF8_STRING
int MQTTPacket_send_connect(Clients *client, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
MQTTSubscribe_options * subscribeOptionsList
int MQTTProtocol_handlePublishes(void *pack, int sock)
int Thread_lock_mutex(mutex_type mutex)
int MQTTPacket_send_disconnect(Clients *client, enum MQTTReasonCodes reason, MQTTProperties *props)
int SSLSocket_connect(SSL *ssl, int sock, const char *hostname, int verify, int(*cb)(const char *str, size_t len, void *u), void *u)
const char * client_version_eye
#define PERSISTENCE_V5_PUBLISH_SENT
int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void *context, MQTTAsync_deliveryComplete *dc)
static int MQTTAsync_unpersistInflightMessages(Clients *c)
#define PERSISTENCE_V5_PUBLISH_RECEIVED
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
static int MQTTAsync_checkConn(MQTTAsync_command *command, MQTTAsyncs *client)
void MQTTAsync_free(void *memory)
enum MQTTReasonCodes reasonCode
void * updateConnectOptions_context
void MQTTAsync_freeMessage(MQTTAsync_message **message)
#define MQTTASYNC_DISCONNECTED
int MQTTAsync_setConnected(MQTTAsync handle, void *context, MQTTAsync_connected *connected)
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
static l_noret error(LoadState *S, const char *why)
void MQTTAsync_connectionLost(void *context, char *cause)
MQTTAsync_connectionLost * cl
MQTTAsync_onSuccess5 * onSuccess5
static void MQTTAsync_terminate(void)
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
int ListRemove(List *aList, void *content)
struct MQTTAsync_successData::@46::@48 connect
long elapsed(START_TIME_TYPE start_time)
struct ListElementStruct * next
START_TIME_TYPE MQTTTime_start_clock(void)
int ListDetach(List *aList, void *content)
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
static void MQTTProtocol_checkPendingWrites(void)
void Log_setTraceCallback(Log_traceCallback *callback)
MQTTProperties properties
#define PERSISTENCE_V5_COMMAND_KEY
int MQTTProperties_getNumericValue(MQTTProperties *props, enum MQTTPropertyCodes propid)
START_TIME_TYPE lastTouch
void Protocol_processPublication(Publish *publish, Clients *client, int allocatePayload)
union MQTTAsync_successData5::@49 alt
static mutex_type stack_mutex
int MQTTAsync_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
#define PERSISTENCE_V5_QUEUE_KEY
static MQTTPacket * MQTTAsync_cycle(int *sock, unsigned long timeout, int *rc)
void * beforeWrite_context
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
static pthread_mutex_t socket_mutex_store
ListElement * ListInsert(List *aList, void *content, size_t size, ListElement *index)
void MQTTAsync_onSuccess5(void *context, MQTTAsync_successData5 *response)
#define MQTTASYNC_OPERATION_INCOMPLETE
struct MQTTAsync_command::@39::@44 conn
void Socket_setWriteCompleteCallback(Socket_writeComplete *mywritecomplete)
static void nextOrClose(MQTTAsyncs *m, int rc, char *message)
int MQTTProtocol_handleUnsubacks(void *pack, int sock)
void Socket_clearPendingWrite(int socket)
void ListEmpty(List *aList)
MQTTProperties MQTTProperties_copy(const MQTTProperties *props)
int MQTTAsync_setAfterPersistenceRead(MQTTAsync handle, void *context, MQTTPersistence_afterRead *co)
static int MQTTAsync_Socket_noPendingWrites(int socket)
struct MQTTAsync_command::@39::@40 sub
MQTTAsync_messageArrived * ma
static int cmdMessageIDCompare(void *a, void *b)
void MQTTAsync_sleep(long milliseconds)
MQTTProperties * connectProps
#define PERSISTENCE_COMMAND_KEY
#define MQTTASYNC_BAD_MQTT_OPTION
MQTTAsync_updateConnectOptions * updateConnectOptions
ListElement * ListNextElement(List *aList, ListElement **pos)
int MQTTAsync_updateConnectOptions(void *context, MQTTAsync_connectData *data)
MQTTAsync_onFailure5 * onFailure5
#define MQTTASYNC_BAD_QOS
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
void MQTTProtocol_keepalive(START_TIME_TYPE now)
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
static void MQTTAsync_checkTimeouts(void)
struct MQTTAsync_command::@39::@41 unsub
int MQTTAsync_setDisconnected(MQTTAsync handle, void *context, MQTTAsync_disconnected *disconnected)
static mutex_type log_mutex
void * disconnected_context
int ListRemoveItem(List *aList, void *content, int(*callback)(void *, void *))
MQTTClient_persistence * persistence
#define MQTTAsync_disconnectOptions_initializer
void SSLSocket_handleOpensslInit(int bool_value)
int Thread_unlock_mutex(mutex_type mutex)
static int MQTTAsync_processCommand(void)
const char * MQTTPacket_name(int ptype)
MQTTAsync_onFailure * onFailure
enum MQTTAsync_threadStates receiveThread_state
int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char *const *topic, MQTTAsync_responseOptions *response)
MQTTAsync_onFailure5 * onFailure5
#define MQTTAsync_connectData_initializer
#define MQTTVERSION_3_1_1
static thread_return_type WINAPI MQTTAsync_sendThread(void *n)
static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions *options, int internal)
ROSLIB_DECL std::string command(const std::string &cmd)
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
int WebSocket_upgrade(networkHandles *net)
static mutex_type mqttasync_mutex
struct MQTTAsync_willOptions::@54 payload
void * MQTTPacket_Factory(int MQTTVersion, networkHandles *net, int *error)
struct MQTTAsync_connectData::@45 binarypwd
void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
unsigned int(* ssl_psk_cb)(const char *hint, char *identity, unsigned int max_identity_len, unsigned char *psk, unsigned int max_psk_len, void *u)
int MQTTAsync_setBeforePersistenceWrite(MQTTAsync handle, void *context, MQTTPersistence_beforeWrite *co)
MQTTProperties properties
int MQTTAsync_isConnected(MQTTAsync handle)
MQTTAsync_deliveryComplete * dc
static int MQTTAsync_connecting(MQTTAsyncs *m)
MQTTAsync_onFailure * onFailure
void Socket_addPendingWrite(int socket)
static int clientStructCompare(void *a, void *b)
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char *const *topic, int *qos, MQTTAsync_responseOptions *response)
void MQTTTime_sleep(ELAPSED_TIME_TYPE milliseconds)
int MQTTProtocol_unsubscribe(Clients *client, List *topics, int msgID, MQTTProperties *props)
struct MQTTAsync_connectOptions::@55 binarypwd
#define MQTTASYNC_NO_MORE_MSGIDS
ListElement * ListAppend(List *aList, void *content, size_t size)
MQTTAsync_onSuccess5 * onSuccess5
#define PERSISTENCE_SEQNO_LIMIT
static void MQTTAsync_closeOnly(Clients *client, enum MQTTReasonCodes reasonCode, MQTTProperties *props)
int SSLSocket_getPendingRead(void)
MQTTAsync_SSLOptions * ssl
static int clientSockCompare(void *a, void *b)
#define MQTTASYNC_MAX_BUFFERED_MESSAGES
MQTTAsync_disconnected * disconnected
int MQTTPersistence_restoreMessageQueue(Clients *c)
MQTTAsync_onSuccess * onSuccess
#define WEBSOCKET_IN_PROGRESS
void MQTTProtocol_closeSession(Clients *c, int sendwill)
#define MQTTASYNC_BAD_STRUCTURE
MQTTAsync_command disconnect
int subscribeOptionsCount
static cond_type send_cond
MQTTAsync_onFailure5 * onFailure5
MQTTAsync_createOptions * createOptions
static int MQTTAsync_cleanSession(Clients *client)
MQTTAsync_willOptions * will
static void MQTTAsync_retry(void)
const char * privateKeyPassword
MQTTSubscribe_options * optlist
#define MQTTASYNC_WRONG_MQTT_VERSION
void MQTTAsync_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode)
struct MQTTSubscribe_options MQTTSubscribe_options
const char * enabledCipherSuites
MQTTProperties properties
int MQTTPersistence_close(Clients *c)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
static thread_id_type sendThread_id
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
struct MQTTAsync_struct MQTTAsyncs
static void MQTTAsync_freeResponses(MQTTAsyncs *m)
enum MQTTAsync_threadStates sendThread_state
int SSLSocket_close(networkHandles *net)
int MQTTProperties_hasProperty(MQTTProperties *props, enum MQTTPropertyCodes propid)
#define MQTT_BAD_SUBSCRIBE
unsigned int ping_outstanding
#define SECURE_MQTT_DEFAULT_PORT
static int retryLoopInterval
MQTTProperties properties
void MQTTAsync_onSuccess(void *context, MQTTAsync_successData *response)
static void MQTTAsync_emptyMessageQueue(Clients *client)
void MQTTAsync_destroy(MQTTAsync *handle)
Persistence_remove premove
char * MQTTStrdup(const char *src)
int Thread_wait_sem(sem_type sem, int timeout)
void MQTTAsync_connected(void *context, char *cause)
static int MQTTAsync_completeConnection(MQTTAsyncs *m, Connack *connack)
int SSLSocket_initialize(void)
void Heap_terminate(void)
static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand *qcmd)
static int MQTTAsync_getNoBufferedMessages(MQTTAsyncs *m)
int MQTTProtocol_handlePingresps(void *pack, int sock)
void MQTTAsync_global_init(MQTTAsync_init_options *inits)
int Heap_initialize(void)
#define MQTTASYNC_SUCCESS
#define PERSISTENCE_MAX_KEY_LENGTH
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
static pthread_mutex_t mqttasync_mutex_store
static void MQTTAsync_lock_mutex(mutex_type amutex)
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
int MQTTPersistence_unpersistQueueEntry(Clients *client, MQTTPersistence_qEntry *qe)
int messageIDCompare(void *a, void *b)
int MQTTPersistence_create(MQTTClient_persistence **persistence, int type, void *pcontext)
const MQTTAsync_nameValue * httpHeaders
MQTTSubscribe_options opts
void MQTTProtocol_emptyMessageList(List *msgList)
#define thread_return_type
MQTTAsync_command command
thread_id_type Thread_getid(void)
int Log_initialize(Log_nameValue *info)
static thread_id_type receiveThread_id
struct MQTTAsync_successData::@46::@47 pub
union MQTTAsync_command::@39 details
START_TIME_TYPE MQTTTime_now(void)
enum MQTTReasonCodes reasonCode
static int cmpkeys(const void *p1, const void *p2)
int MQTTProtocol_handlePubrels(void *pack, int sock)
static volatile int global_initialized
#define MQTTAsync_failureData5_initializer
int MQTTAsync_reconnect(MQTTAsync handle)
#define MQTTASYNC_PERSISTENCE_ERROR
int ListRemoveHead(List *aList)
MQTTProperties * willProperties
int MQTTProtocol_subscribe(Clients *client, List *topics, List *qoss, int msgID, MQTTSubscribe_options *opts, MQTTProperties *props)
static ClientStates ClientState
const char * MQTTAsync_strerror(int code)
static int MQTTAsync_unpersistCommandsAndMessages(Clients *c)
static void MQTTAsync_writeComplete(int socket, int rc)
static void setRetryLoopInterval(int keepalive)
MQTTSubscribe_options subscribeOptions
static MQTTAsync_queuedCommand * MQTTAsync_restoreCommand(char *buffer, int buflen, int MQTTVersion, MQTTAsync_queuedCommand *)
void MQTTAsync_onFailure(void *context, MQTTAsync_failureData *response)
#define PERSISTENCE_QUEUE_KEY
static void MQTTAsync_insertInOrder(List *list, void *content, int size, struct keyloc *keyloc_array, int array_size)
List * ListInitialize(void)
int Thread_signal_cond(cond_type condvar)
#define MQTTASYNC_BAD_PROTOCOL
#define ELAPSED_TIME_TYPE
int WebSocket_connect(networkHandles *net, const char *uri)
void Log_traceCallback(enum LOG_LEVELS level, const char *message)
int MQTTProtocol_handleSubacks(void *pack, int sock)
thread_type Thread_start(thread_fn fn, void *parameter)
size_t MQTTProtocol_addressPort(const char *uri, int *port, const char **topic, int default_port)
#define PERSISTENCE_PUBREL
void MQTTAsync_onFailure5(void *context, MQTTAsync_failureData5 *response)
void ListFreeNoContent(List *aList)
int MQTTProtocol_handlePubrecs(void *pack, int sock)
MQTTAsync_onFailure * onFailure
#define TCPSOCKET_COMPLETE
static int MQTTAsync_assignMsgId(MQTTAsyncs *m)
int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void *context, MQTTAsync_messageArrived *ma)
MQTTPersistence_afterRead * afterRead
MQTTAsync_onSuccess5 * onSuccess5
int MQTTProtocol_startPublish(Clients *pubclient, Publish *publish, int qos, int retained, Messages **mm)
static void MQTTAsync_stop(void)
MQTTProperties properties
int MQTTAsync_randomJitter(int currentIntervalBase, int minInterval, int maxInterval)
static mutex_type socket_mutex
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
#define MQTTSubscribe_options_initializer
#define PERSISTENCE_V5_PUBREL
int allowDisconnectedSendAtAnyTime
static void MQTTAsync_freeCommands(MQTTAsyncs *m)
enum MQTTReasonCodes reasonCode
void Socket_close(int socket)
MQTTProperties properties
ListElement * ListFind(List *aList, void *content)
static void MQTTAsync_startConnectRetry(MQTTAsyncs *m)
static int MQTTAsync_restoreCommands(MQTTAsyncs *client)
#define MQTTASYNC_TOPICNAME_TRUNCATED
#define MQTTAsync_message_initializer
static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
static int MQTTAsync_addCommand(MQTTAsync_queuedCommand *command, int command_size)
static int MQTTAsync_deliverMessage(MQTTAsyncs *m, char *topicName, size_t topicLen, MQTTAsync_message *mm)
void ListFree(List *aList)
#define PERSISTENCE_PUBLISH_RECEIVED
struct MQTTAsync_command::@39::@42 pub
int MQTTPersistence_afterRead(void *context, char **buffer, int *buflen)
int MQTTPersistence_persistQueueEntry(Clients *aclient, MQTTPersistence_qEntry *qe)
void Socket_outInitialize(void)
int MQTTAsync_setUpdateConnectOptions(MQTTAsync handle, void *context, MQTTAsync_updateConnectOptions *updateOptions)
struct MQTTAsync_command::@39::@43 dis
MQTTAsync_onSuccess * onSuccess
ELAPSED_TIME_TYPE MQTTTime_elapsed(START_TIME_TYPE milliseconds)
struct MQTTAsync_successData5::@49::@51 pub
MQTTAsync_command connect
#define MQTTVERSION_DEFAULT
MQTTAsync_connected * connected
MQTTProperties * willProps
void MQTTAsync_deliveryComplete(void *context, MQTTAsync_token token)
struct MQTTAsync_successData5::@49::@50 sub
MQTTAsync_onSuccess5 * onSuccess5
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
#define MQTTASYNC_MAX_MESSAGES_INFLIGHT
static void MQTTAsync_closeSession(Clients *client, enum MQTTReasonCodes reasonCode, MQTTProperties *props)
#define TCPSOCKET_INTERRUPTED
#define MQTTASYNC_0_LEN_WILL_TOPIC
MQTTAsync_onSuccess * onSuccess
unsigned int cleansession
static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand *qcmd)
MQTTAsync_command * pending_write
MQTTAsync_onSuccess * onSuccess
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
int MQTTPersistence_beforeWrite(void *context, int bufcount, char *buffers[], int buflens[])
int(* ssl_error_cb)(const char *str, size_t len, void *u)
MQTTProperties * connectProperties
#define MQTTProperties_initializer
const unsigned char * protos