27 #if !defined(_WINDOWS) 29 #include <sys/socket.h> 36 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) 52 "tcp://localhost:1883",
64 if (strcmp(argv[count],
"--test_no") == 0)
71 else if (strcmp(argv[count],
"--size") == 0)
78 else if (strcmp(argv[count],
"--connection") == 0)
85 else if (strcmp(argv[count],
"--verbose") == 0)
96 #include <sys/timeb.h> 101 #if defined(_WIN32) || defined(_WINDOWS) 111 #if defined(_WIN32) || defined(_WINDOWS) 115 gettimeofday(&ts, NULL);
116 localtime_r(&ts.tv_sec, &timeinfo);
118 strftime(msg_buf, 80,
"%Y%m%d %H%M%S", &timeinfo);
120 #if defined(_WIN32) || defined(_WINDOWS) 121 sprintf(&msg_buf[strlen(msg_buf)],
".%.3hu ", ts.millitm);
123 sprintf(&msg_buf[strlen(msg_buf)],
".%.3lu ", ts.tv_usec / 1000);
126 va_start(args, format);
127 vsnprintf(&msg_buf[strlen(msg_buf)],
sizeof(msg_buf) - strlen(msg_buf), format, args);
130 printf(
"%s\n", msg_buf);
135 #if defined(_WIN32) || defined(_WINDOWS) 136 #define mqsleep(A) Sleep(1000*A) 137 #define START_TIME_TYPE DWORD 138 static DWORD start_time = 0;
141 return GetTickCount();
144 #define mqsleep sleep 145 #define START_TIME_TYPE struct timespec 148 static struct timespec start;
149 clock_gettime(CLOCK_REALTIME, &start);
153 #define mqsleep sleep 154 #define START_TIME_TYPE struct timeval 158 struct timeval start_time;
159 gettimeofday(&start_time, NULL);
168 return GetTickCount() - start_time;
172 long elapsed(
struct timespec start)
174 struct timespec now, res;
176 clock_gettime(CLOCK_REALTIME, &now);
177 ntimersub(now, start, res);
178 return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
183 struct timeval now, res;
185 gettimeofday(&now, NULL);
186 timersub(&now, &start_time, &res);
187 return (res.tv_sec)*1000 + (res.tv_usec)/1000;
195 #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d) 196 #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e) 211 printf(
"Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
213 va_start(args, format);
214 vprintf(format, args);
218 MyLog(
LOGA_DEBUG,
"Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
257 if (++message_count == 1)
261 pubmsg.
payload =
"a much longer message that we can shorten to the extent that we need to payload up to 11";
291 pubmsg.
payload =
"a much longer message that we can shorten to the extent that we need to payload up to 11";
306 MyLog(
LOGA_DEBUG,
"In connect onSuccess callback, context %p", context);
322 MyLog(
LOGA_DEBUG,
"In connect onFailure callback, context %p", context);
341 char* serverURIs[2] = {
"tcp://localhost:1882", options.
connection};
402 MyLog(
LOGA_DEBUG,
"In connect onFailure callback, context %p", context);
412 MyLog(
LOGA_DEBUG,
"In connect onSuccess callback, context %p\n", context);
414 assert(
"Connect should not succeed", 0,
"connect success callback was called", 0);
543 pubmsg.
payload =
"a much longer message that we can shorten to the extent that we need to payload up to 11";
554 pubmsg.
payload =
"a QoS 0 message that we can shorten to the extent that we need to payload up to 11";
586 pubmsg.
payload =
"a much longer message that we can shorten to the extent that we need to payload up to 11";
618 assert(
"Should have connected", 0,
"failed to connect", NULL);
634 #define TEST3_CLIENTS 10 648 sprintf(clientdata[i].
clientid,
"async_test3_num_%d", i);
649 sprintf(clientdata[i].
test_topic,
"async test3 topic num %d", i);
650 clientdata[i].
index = i;
709 MyLog(
LOGA_DEBUG,
"In publish onSuccess callback, context %p", context);
728 "message content was %c", ((
char*)message->
payload)[i]);
733 if (++message_count == 1)
747 else if (message_count == 2)
790 ((
char*)pubmsg.
payload)[i] = rand() % 256;
806 MyLog(
LOGA_DEBUG,
"In connect onSuccess callback, context %p", context);
892 MyLog(
LOGA_DEBUG,
"In connect onSuccess callback, context %p", context);
902 MyLog(
LOGA_DEBUG,
"In connect onFailure callback, context %p", context);
919 char* serverURIs[3] = {
"tcp://localhost:1880",
"tcp://localhost:1881",
"tcp://localhost:1882"};
922 MyLog(
LOGA_INFO,
"Starting test 5a - All HA connections out of service");
981 char* serverURIs[3] = {
"tcp://localhost:1880",
"tcp://localhost:1881", options.
connection};
984 MyLog(
LOGA_INFO,
"Starting test 5b - All HA connections out of service except the last one");
1043 char* serverURIs[3] = {options.
connection,
"tcp://localhost:1881",
"tcp://localhost:1882"};
1046 MyLog(
LOGA_INFO,
"Starting test 5c - All HA connections out of service except the first one");
1097 if (strstr(message,
"onnect") && !strstr(message,
"isconnect"))
1098 printf(
"Trace : %d, %s\n", level, message);
MQTTAsync_onFailure * onFailure
int test2(struct Options options)
enum MQTTPropertyCodes value
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
START_TIME_TYPE start_clock(void)
int main(int argc, char **argv)
union MQTTAsync_successData::@46 alt
void test1_onDisconnect(void *context, MQTTAsync_successData *response)
#define MQTTAsync_responseOptions_initializer
int test5_onFailure_called
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
int test5a(struct Options options)
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
void MQTTAsync_free(void *memory)
void MQTTAsync_freeMessage(MQTTAsync_message **message)
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
void test3_onSubscribe(void *context, MQTTAsync_successData *response)
void test3_onFailure(void *context, MQTTAsync_failureData *response)
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
void test5_onConnect(void *context, MQTTAsync_successData *response)
void test2_onConnect(void *context, MQTTAsync_successData *response)
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
int test4(struct Options options)
void test1_onSubscribe(void *context, MQTTAsync_successData *response)
int test5b(struct Options options)
void test1_onConnect(void *context, MQTTAsync_successData *response)
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
long elapsed(START_TIME_TYPE start_time)
int test3_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
#define MQTTAsync_willOptions_initializer
void test5_onConnectFailure(void *context, MQTTAsync_failureData *response)
void test1_onUnsubscribe(void *context, MQTTAsync_successData *response)
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
void test4_onConnect(void *context, MQTTAsync_successData *response)
int test5c(struct Options options)
void myassert(char *filename, int lineno, char *description, int value, char *format,...)
void getopts(int argc, char **argv)
#define MQTTAsync_disconnectOptions_initializer
void test3_onUnsubscribe(void *context, MQTTAsync_successData *response)
int test3(struct Options options)
#define MQTTAsync_connectOptions_initializer
MQTTAsync_onSuccess * onSuccess
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
MQTTAsync_willOptions * will
int test5_onConnect_called
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
void test4_onSubscribe(void *context, MQTTAsync_successData *response)
void test3_onConnect(void *context, MQTTAsync_successData *response)
void MQTTAsync_destroy(MQTTAsync *handle)
void MyLog(int LOGA_level, char *format,...)
void test3_onDisconnect(void *context, MQTTAsync_successData *response)
#define MQTTASYNC_SUCCESS
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
void test1_onConnectFailure(void *context, MQTTAsync_failureData *response)
int test1_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
#define MQTTCLIENT_PERSISTENCE_NONE
int test4_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
void test3_onPublish(void *context, MQTTAsync_successData *response)
void test2_onFailure(void *context, MQTTAsync_failureData *response)
#define assert(a, b, c, d)
START_TIME_TYPE global_start_time
void test4_onPublish(void *context, MQTTAsync_successData *response)
#define MQTTAsync_message_initializer
MQTTAsync_onSuccess * onSuccess
volatile int test_finished
int test1(struct Options options)
int test2_onFailure_called
MQTTAsync_onSuccess * onSuccess