29 #if !defined(_WINDOWS) 64 "tcp://localhost:1884",
67 "tcp://localhost:7777",
68 "Eclipse/Paho/restart_test",
69 "Eclipse/Paho/restart_test/control",
86 if (strcmp(argv[count],
"--qos") == 0)
90 if (strcmp(argv[count],
"0") == 0)
92 else if (strcmp(argv[count],
"1") == 0)
94 else if (strcmp(argv[count],
"2") == 0)
102 else if (strcmp(argv[count],
"--slot_no") == 0)
105 opts.slot_no = atoi(argv[count]);
109 else if (strcmp(argv[count],
"--connection") == 0)
116 else if (strcmp(argv[count],
"--connections") == 0)
120 opts.connection_count = 0;
122 char* tok = strtok(argv[count],
" ");
126 strcpy(
opts.connections[
opts.connection_count], tok);
127 opts.connection_count++;
128 tok = strtok(NULL,
" ");
134 else if (strcmp(argv[count],
"--control_connection") == 0)
141 else if (strcmp(argv[count],
"--clientid") == 0)
148 else if (strcmp(argv[count],
"--username") == 0)
155 else if (strcmp(argv[count],
"--password") == 0)
162 else if (strcmp(argv[count],
"--persistent") == 0)
163 opts.persistence = 1;
164 else if (strcmp(argv[count],
"--verbose") == 0)
171 #define LOGA_ALWAYS 1 175 #include <sys/timeb.h> 180 #if defined(_WIN32) || defined(_WINDOWS) 190 #if defined(_WIN32) || defined(_WINDOWS) 194 gettimeofday(&ts, NULL);
195 localtime_r(&ts.tv_sec, &timeinfo);
197 strftime(msg_buf, 80,
"%Y%m%d %H%M%S", &timeinfo);
199 #if defined(_WIN32) || defined(_WINDOWS) 200 sprintf(&msg_buf[strlen(msg_buf)],
".%.3hu ", ts.millitm);
202 sprintf(&msg_buf[strlen(msg_buf)],
".%.3lu ", ts.tv_usec / 1000);
205 va_start(args, format);
206 vsnprintf(&msg_buf[strlen(msg_buf)],
sizeof(msg_buf) - strlen(msg_buf), format, args);
209 printf(
"%s\n", msg_buf);
215 #if defined(_WIN32) || defined(_WIN64) 218 usleep(milliseconds*1000);
222 #if defined(_WIN32) || defined(_WINDOWS) 223 #define START_TIME_TYPE DWORD 224 static DWORD start_time = 0;
227 return GetTickCount();
230 #define START_TIME_TYPE struct timespec 233 static struct timespec start;
234 clock_gettime(CLOCK_REALTIME, &start);
238 #define START_TIME_TYPE struct timeval 242 struct timeval start_time;
243 gettimeofday(&start_time, NULL);
251 return GetTickCount() - start_time;
255 long elapsed(
struct timespec start)
257 struct timespec now, res;
259 clock_gettime(CLOCK_REALTIME, &now);
260 ntimersub(now, start, res);
261 return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
266 struct timeval now, res;
268 gettimeofday(&now, NULL);
269 timersub(&now, &start_time, &res);
270 return (res.tv_sec)*1000 + (res.tv_usec)/1000;
312 if (strncmp(m->
payload,
"stop", 4) == 0)
317 else if (wait_message != NULL && strncmp(wait_message, m->
payload,
318 strlen(wait_message)) == 0)
344 sprintf(buf,
"%s: %s",
opts.clientid, message);
363 sprintf(buf,
"waiting for: %s", message);
408 token = strtok(m->
payload,
" ");
409 token = strtok(NULL,
" ");
410 token = strtok(NULL,
" ");
425 "Error, expecting sequence number %d but got message id %d, payload was %.*s",
429 "Error, expecting sequence number %d but got %d message id %d",
457 MyLog(
LOGA_ALWAYS,
"Failed to reconnect with return code %d", (response) ? response->
code : -9999);
480 MyLog(
LOGA_ALWAYS,
"Connection lost when %d messages arrived out of %d expected",
484 if (
opts.persistence)
495 if (
opts.connections)
526 #if !defined(_WINDOWS) 581 "Workload test failed because the wrong number of messages" 582 " was received: %d whereas %d were expected",
616 if (++wait_count > limit ||
stopping)
646 int test_interval = 30;
663 sprintf(payload,
"message number %d", i);
721 sprintf(payload,
"message number %d", seqno);
726 MyLog(
LOGA_INFO,
"Rc %d from publish with payload %s, retrying", rc, payload);
824 if (
opts.persistence)
858 if (
opts.connections)
897 goto disconnect_exit;
929 MyLog(
LOGA_DEBUG,
"In control subscribe onSuccess callback %p granted qos %d", c, response->
alt.
qos);
965 printf(
"Trace : %d, %s\n", level, message);
968 int main(
int argc,
char** argv)
972 static char topic_buf[200];
976 signal(SIGPIPE, SIG_IGN);
989 sprintf(topic_buf,
"%s_%d",
opts.topic,
opts.slot_no);
990 opts.topic = topic_buf;
992 sprintf(clientid,
"%s_%d",
opts.clientid,
opts.slot_no);
void client_onFailure(void *context, MQTTAsync_failureData *response)
MQTTAsync_onFailure * onFailure
long last_completion_time
void control_connectionLost(void *context, char *cause)
void client_onCleanedDisconnected(void *context, MQTTAsync_successData *response)
int recreateReconnect(void)
FMT_INLINE std::basic_string< Char > format(const S &format_str, Args &&...args)
void client_onConnect(void *context, MQTTAsync_successData *response)
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
void client_onReconnectFailure(void *context, MQTTAsync_failureData *response)
union MQTTAsync_successData::@46 alt
#define MQTTAsync_responseOptions_initializer
static int control_subscribed
static char sub_topic[200]
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
int main(int argc, char **argv)
MQTTAsync_nameValue * MQTTAsync_getVersionInfo(void)
size_t strftime(char *str, size_t count, const char *format, const std::tm *time)
void MQTTAsync_free(void *memory)
void MQTTAsync_freeMessage(MQTTAsync_message **message)
#define MQTTASYNC_DISCONNECTED
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback *callback)
void client_onCleaned(void *context, MQTTAsync_successData *response)
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
int control_which(char *message1, char *message2)
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
void getopts(int argc, char **argv)
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
long elapsed(START_TIME_TYPE start_time)
#define MQTTAsync_disconnectOptions_initializer
MQTTAsync_onFailure * onFailure
void connectionLost(void *context, char *cause)
void messageSent(void *context, MQTTAsync_successData *response)
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
MQTTAsync_onFailure * onFailure
void MyLog(int LOGA_level, char *format,...)
#define MQTTAsync_connectOptions_initializer
MQTTAsync_onSuccess * onSuccess
int waitForCompletion(START_TIME_TYPE start_time)
int control_send(char *message)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
void control_onConnect(void *context, MQTTAsync_successData *response)
void MQTTAsync_destroy(MQTTAsync *handle)
MQTTAsync_connectOptions conn_opts
static char pub_topic[200]
void client_onSubscribe(void *context, MQTTAsync_successData *response)
#define MQTTASYNC_SUCCESS
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
#define MQTTCLIENT_PERSISTENCE_NONE
int control_messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
void MySleep(long milliseconds)
START_TIME_TYPE global_start_time
void client_onReconnect(void *context, MQTTAsync_successData *response)
void control_onSubscribe(void *context, MQTTAsync_successData *response)
char * control_connection
MQTTAsync_onSuccess * onSuccess
int control_wait(char *message)
static int client_subscribed
void control_onFailure(void *context, MQTTAsync_failureData *response)
MQTTAsync_onSuccess * onSuccess
START_TIME_TYPE start_clock(void)