mqttasync_module.c
Go to the documentation of this file.
1 #include <Python.h>
2 
3 #include "MQTTAsync.h"
4 #include "LinkedList.h"
5 
6 static PyObject *MqttV3Error;
7 
8 
9 static PyObject* mqttv3_create(PyObject* self, PyObject *args)
10 {
11  MQTTAsync c;
12  char* serverURI;
13  char* clientId;
14  int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
15  PyObject *pyoptions = NULL, *temp;
17  int rc;
18 
19  if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
20  &persistence_option, &pyoptions))
21  return NULL;
22 
23  if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
24  && persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
25  {
26  PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
27  return NULL;
28  }
29 
30  if (pyoptions)
31  {
32 
33  if (!PyDict_Check(pyoptions))
34  {
35  PyErr_SetString(PyExc_TypeError,
36  "Create options parameter must be a dictionary");
37  return NULL;
38  }
39 
40  if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
41  != NULL)
42  {
43  if (PyInt_Check(temp))
44  options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
45  else
46  {
47  PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
48  return NULL;
49  }
50  }
51 
52  if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
53  != NULL)
54  {
55  if (PyInt_Check(temp))
56  options.maxBufferedMessages = (int) PyInt_AsLong(temp);
57  else
58  {
59  PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
60  return NULL;
61  }
62  }
63  rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
64  }
65  else
66  rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
67 
68 
69  return Py_BuildValue("ik", rc, c);
70 }
71 
72 
73 static List* callbacks = NULL;
74 static List* connected_callbacks = NULL;
75 
77 {
79 };
80 
81 typedef struct
82 {
84  PyObject *context;
85  PyObject *cl, *ma, *dc;
87 
88 
89 typedef struct
90 {
92  PyObject *context;
93  PyObject *co;
95 
96 
97 int clientCompare(void* a, void* b)
98 {
99  CallbackEntry* e = (CallbackEntry*) a;
100  return e->c == (MQTTAsync) b;
101 }
102 
103 
104 int connectedCompare(void* a, void* b)
105 {
106  ConnectedEntry* e = (ConnectedEntry*) a;
107  return e->c == (MQTTAsync) b;
108 }
109 
110 
111 void connected(void* context, char* cause)
112 {
113  /* call the right Python function, using the context */
114  PyObject *arglist;
115  PyObject *result;
116  ConnectedEntry* e = context;
117  PyGILState_STATE gstate;
118 
119  gstate = PyGILState_Ensure();
120  arglist = Py_BuildValue("Os", e->context, cause);
121  result = PyEval_CallObject(e->co, arglist);
122  Py_DECREF(arglist);
123  PyGILState_Release(gstate);
124 }
125 
126 
127 void connectionLost(void* context, char* cause)
128 {
129  /* call the right Python function, using the context */
130  PyObject *arglist;
131  PyObject *result;
132  CallbackEntry* e = context;
133  PyGILState_STATE gstate;
134 
135  gstate = PyGILState_Ensure();
136  arglist = Py_BuildValue("Os", e->context, cause);
137  result = PyEval_CallObject(e->cl, arglist);
138  Py_DECREF(arglist);
139  PyGILState_Release(gstate);
140 }
141 
142 
143 int messageArrived(void* context, char* topicName, int topicLen,
144  MQTTAsync_message* message)
145 {
146  PyObject *result = NULL;
147  CallbackEntry* e = context;
148  int rc = -99;
149  PyGILState_STATE gstate;
150 
151  gstate = PyGILState_Ensure();
152  if (topicLen == 0)
153  result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
154  topicName, "payload", message->payload, message->payloadlen,
155  "qos", message->qos, "retained", message->retained, "dup",
156  message->dup, "msgid", message->msgid);
157  else
158  result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
159  topicName, topicLen, "payload", message->payload,
160  message->payloadlen, "qos", message->qos, "retained",
161  message->retained, "dup", message->dup, "msgid",
162  message->msgid);
163  if (result)
164  {
165  if (PyInt_Check(result))
166  rc = (int) PyInt_AsLong(result);
167  Py_DECREF(result);
168  }
169  PyGILState_Release(gstate);
170  MQTTAsync_free(topicName);
171  MQTTAsync_freeMessage(&message);
172  return rc;
173 }
174 
175 
177 {
178  PyObject *arglist;
179  PyObject *result;
180  CallbackEntry* e = context;
181  PyGILState_STATE gstate;
182 
183  gstate = PyGILState_Ensure();
184  arglist = Py_BuildValue("Oi", e->context, dt);
185  result = PyEval_CallObject(e->dc, arglist);
186  Py_DECREF(arglist);
187  PyGILState_Release(gstate);
188 }
189 
190 
191 static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
192 {
193  MQTTAsync c;
194  CallbackEntry* e = NULL;
195  int rc;
196 
197  e = malloc(sizeof(CallbackEntry));
198 
199  if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
200  &e->ma, &e->dc))
201  return NULL;
202  e->c = c;
203 
204  if ((e->cl != Py_None && !PyCallable_Check(e->cl))
205  || (e->ma != Py_None && !PyCallable_Check(e->ma))
206  || (e->dc != Py_None && !PyCallable_Check(e->dc)))
207  {
208  PyErr_SetString(PyExc_TypeError,
209  "3rd, 4th and 5th parameters must be callable or None");
210  return NULL;
211  }
212 
213  Py_BEGIN_ALLOW_THREADS
215  Py_END_ALLOW_THREADS
216 
217  if (rc == MQTTASYNC_SUCCESS)
218  {
219  ListElement* temp = NULL;
220  if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
221  {
222  ListDetach(callbacks, temp->content);
223  free(temp->content);
224  }
225  ListAppend(callbacks, e, sizeof(e));
226  Py_XINCREF(e->cl);
227  Py_XINCREF(e->ma);
228  Py_XINCREF(e->dc);
229  Py_XINCREF(e->context);
230  }
231 
232  return Py_BuildValue("i", rc);
233 }
234 
235 
236 static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
237 {
238  MQTTAsync c;
239  ConnectedEntry* e = NULL;
240  int rc;
241 
242  e = malloc(sizeof(ConnectedEntry));
243 
244  if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
245  return NULL;
246  e->c = c;
247 
248  if (e->co != Py_None && !PyCallable_Check(e->co))
249  {
250  PyErr_SetString(PyExc_TypeError,
251  "3rd parameter must be callable or None");
252  return NULL;
253  }
254 
255  Py_BEGIN_ALLOW_THREADS
256  rc = MQTTAsync_setConnected(c, e, connected);
257  Py_END_ALLOW_THREADS
258 
259  if (rc == MQTTASYNC_SUCCESS)
260  {
261  ListElement* temp = NULL;
262  if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
263  {
264  ListDetach(connected_callbacks, temp->content);
265  free(temp->content);
266  }
267  ListAppend(connected_callbacks, e, sizeof(e));
268  Py_XINCREF(e->co);
269  Py_XINCREF(e->context);
270  }
271 
272  return Py_BuildValue("i", rc);
273 }
274 
275 
276 typedef struct
277 {
279  PyObject *context;
280  PyObject *onSuccess, *onFailure;
281  enum msgTypes msgType;
282 } ResponseEntry;
283 
284 
286 {
287  PyObject *result = NULL;
288  ResponseEntry* e = context;
289  PyGILState_STATE gstate;
290 
291  gstate = PyGILState_Ensure();
292  switch (e->msgType)
293  {
294  case CONNECT:
295  result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
296  "MQTTVersion", response->alt.connect.MQTTVersion,
297  "sessionPresent", response->alt.connect.sessionPresent,
298  "serverURI", response->alt.connect.serverURI);
299  break;
300 
301  case PUBLISH:
302  result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
303  "token", response->token,
304  "destinationName", response->alt.pub.destinationName,
305  "message",
306  "payload", response->alt.pub.message.payload,
307  response->alt.pub.message.payloadlen,
308  "qos", response->alt.pub.message.qos,
309  "retained", response->alt.pub.message.retained);
310  break;
311 
312  case SUBSCRIBE:
313  result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
314  "token", response->token,
315  "qos", response->alt.qos);
316  break;
317 
318  case SUBSCRIBE_MANY:
319  result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
320  "token", response->token,
321  "qosList", response->alt.qosList[0]);
322  break;
323 
324  case UNSUBSCRIBE:
325  result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
326  "token", response->token);
327  break;
328  }
329  if (result)
330  {
331  Py_DECREF(result);
332  printf("decrementing reference count for result\n");
333  }
334  PyGILState_Release(gstate);
335  free(e);
336 }
337 
338 
340 {
341  PyObject *result = NULL;
342  PyObject *arglist = NULL;
343  ResponseEntry* e = context;
344  PyGILState_STATE gstate;
345 
346  gstate = PyGILState_Ensure();
347 
348  // TODO: convert response into Python structure
349  if (e->context)
350  arglist = Py_BuildValue("OO", e->context, response);
351  else
352  arglist = Py_BuildValue("OO", Py_None, response);
353 
354  result = PyEval_CallObject(e->onFailure, arglist);
355  Py_DECREF(arglist);
356  PyGILState_Release(gstate);
357  free(e);
358 }
359 
360 
361 /* return true if ok, false otherwise */
362 int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
363  enum msgTypes msgType)
364 {
365  PyObject *temp = NULL;
366 
367  if (!pyoptions)
368  return 1;
369 
370  if (!PyDict_Check(pyoptions))
371  {
372  PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
373  return 0;
374  }
375 
376  if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
377  {
378  if (PyCallable_Check(temp)) /* temp points to Python function */
379  responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
380  else
381  {
382  PyErr_SetString(PyExc_TypeError,
383  "onSuccess value must be callable");
384  return 0;
385  }
386  }
387 
388  if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
389  {
390  if (PyCallable_Check(temp))
391  responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
392  else
393  {
394  PyErr_SetString(PyExc_TypeError,
395  "onFailure value must be callable");
396  return 0;
397  }
398  }
399 
400  responseOptions->context = PyDict_GetItemString(pyoptions, "context");
401 
402  if (responseOptions->onFailure || responseOptions->onSuccess)
403  {
404  ResponseEntry* r = malloc(sizeof(ResponseEntry));
405  r->c = c;
406  r->context = responseOptions->context;
407  responseOptions->context = r;
408  r->onSuccess = (PyObject*)responseOptions->onSuccess;
409  responseOptions->onSuccess = onSuccess;
410  r->onFailure = (PyObject*)responseOptions->onFailure;
411  responseOptions->onFailure = onFailure;
412  r->msgType = msgType;
413  }
414 
415  return 1; /* not an error, if we get here */
416 }
417 
418 
419 static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
420 {
421  MQTTAsync c;
422  PyObject *pyoptions = NULL, *temp;
425  int rc;
426 
427  if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
428  return NULL;
429 
430  if (!pyoptions)
431  goto skip;
432 
433  if (!PyDict_Check(pyoptions))
434  {
435  PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
436  return NULL;
437  }
438 
439  if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
440  {
441  if (PyCallable_Check(temp)) /* temp points to Python function */
442  connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
443  else
444  {
445  PyErr_SetString(PyExc_TypeError,
446  "onSuccess value must be callable");
447  return NULL;
448  }
449  }
450 
451  if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
452  {
453  if (PyCallable_Check(temp))
454  connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
455  else
456  {
457  PyErr_SetString(PyExc_TypeError,
458  "onFailure value must be callable");
459  return NULL;
460  }
461  }
462 
463  connectOptions.context = PyDict_GetItemString(pyoptions, "context");
464 
465  if (connectOptions.onFailure || connectOptions.onSuccess)
466  {
467  ResponseEntry* r = malloc(sizeof(ResponseEntry));
468  r->c = c;
469  r->context = connectOptions.context;
470  connectOptions.context = r;
471  r->onSuccess = (PyObject*)connectOptions.onSuccess;
472  connectOptions.onSuccess = onSuccess;
473  r->onFailure = (PyObject*)connectOptions.onFailure;
474  connectOptions.onFailure = onFailure;
475  r->msgType = CONNECT;
476  }
477 
478  if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
479  {
480  if (PyInt_Check(temp))
481  connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
482  else
483  {
484  PyErr_SetString(PyExc_TypeError,
485  "keepAliveLiveInterval value must be int");
486  return NULL;
487  }
488  }
489 
490  if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
491  {
492  if (PyInt_Check(temp))
493  connectOptions.cleansession = (int) PyInt_AsLong(temp);
494  else
495  {
496  PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
497  return NULL;
498  }
499  }
500 
501  if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
502  {
503  if (PyDict_Check(temp))
504  {
505  PyObject *wtemp = NULL;
506  if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
507  {
508  PyErr_SetString(PyExc_TypeError,
509  "will topicName value must be set");
510  return NULL;
511  }
512  else
513  {
514  if (PyString_Check(wtemp))
515  willOptions.topicName = PyString_AsString(wtemp);
516  else
517  {
518  PyErr_SetString(PyExc_TypeError,
519  "will topicName value must be string");
520  return NULL;
521  }
522  }
523  if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
524  {
525  PyErr_SetString(PyExc_TypeError,
526  "will message value must be set");
527  return NULL;
528  }
529  else
530  {
531  if (PyString_Check(wtemp))
532  willOptions.message = PyString_AsString(wtemp);
533  else
534  {
535  PyErr_SetString(PyExc_TypeError,
536  "will message value must be string");
537  return NULL;
538  }
539  }
540  if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
541  {
542  if (PyInt_Check(wtemp))
543  willOptions.retained = (int) PyInt_AsLong(wtemp);
544  else
545  {
546  PyErr_SetString(PyExc_TypeError,
547  "will retained value must be int");
548  return NULL;
549  }
550  }
551  if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
552  {
553  if (PyInt_Check(wtemp))
554  willOptions.qos = (int) PyInt_AsLong(wtemp);
555  else
556  {
557  PyErr_SetString(PyExc_TypeError,
558  "will qos value must be int");
559  return NULL;
560  }
561  }
562  connectOptions.will = &willOptions;
563  }
564  else
565  {
566  PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
567  return NULL;
568  }
569  }
570 
571  if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
572  {
573  if (PyString_Check(temp))
574  connectOptions.username = PyString_AsString(temp);
575  else
576  {
577  PyErr_SetString(PyExc_TypeError, "username value must be string");
578  return NULL;
579  }
580  }
581 
582  if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
583  {
584  if (PyString_Check(temp))
585  connectOptions.username = PyString_AsString(temp);
586  else
587  {
588  PyErr_SetString(PyExc_TypeError, "password value must be string");
589  return NULL;
590  }
591  }
592 
593  if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
594  {
595  if (PyInt_Check(temp))
596  connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
597  else
598  {
599  PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
600  return NULL;
601  }
602  }
603 
604  if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
605  {
606  if (PyInt_Check(temp))
607  connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
608  else
609  {
610  PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
611  return NULL;
612  }
613  }
614 
615  if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
616  {
617  if (PyInt_Check(temp))
618  connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
619  else
620  {
621  PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
622  return NULL;
623  }
624  }
625 
626 skip:
627  Py_BEGIN_ALLOW_THREADS
628  rc = MQTTAsync_connect(c, &connectOptions);
629  Py_END_ALLOW_THREADS
630  return Py_BuildValue("i", rc);
631 }
632 
633 
634 static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
635 {
636  MQTTAsync c;
638  int rc;
639 
640  if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
641  return NULL;
642 
643  Py_BEGIN_ALLOW_THREADS
644  rc = MQTTAsync_disconnect(c, &options);
645  Py_END_ALLOW_THREADS
646  return Py_BuildValue("i", rc);
647 }
648 
649 
650 static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
651 {
652  MQTTAsync c;
653  int rc;
654 
655  if (!PyArg_ParseTuple(args, "k", &c))
656  return NULL;
657  Py_BEGIN_ALLOW_THREADS
658  rc = MQTTAsync_isConnected(c);
659  Py_END_ALLOW_THREADS
660  return Py_BuildValue("i", rc);
661 }
662 
663 
664 static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
665 {
666  MQTTAsync c;
668  PyObject *pyoptions = NULL;
669  char* topic;
670  int qos = 2;
671  int rc;
672 
673  if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
674  return NULL;
675 
676  if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
677  return NULL;
678 
679  Py_BEGIN_ALLOW_THREADS;
680  rc = MQTTAsync_subscribe(c, topic, qos, &response);
681  Py_END_ALLOW_THREADS;
682  return Py_BuildValue("i", rc);
683 }
684 
685 
686 static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
687 {
688  MQTTAsync c;
690  PyObject* topicList;
691  PyObject* qosList;
692  PyObject *pyoptions = NULL;
693 
694  int count;
695  char** topics;
696  int* qoss;
697 
698  int i, rc = 0;
699 
700  if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
701  return NULL;
702 
703  if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
704  return NULL;
705 
706  if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
707  {
708  PyErr_SetString(PyExc_TypeError,
709  "3rd and 4th parameters must be sequences");
710  return NULL;
711  }
712 
713  if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
714  {
715  PyErr_SetString(PyExc_TypeError,
716  "3rd and 4th parameters must be sequences of the same length");
717  return NULL;
718  }
719 
720  topics = malloc(count * sizeof(char*));
721  for (i = 0; i < count; ++i)
722  topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
723 
724  qoss = malloc(count * sizeof(int));
725  for (i = 0; i < count; ++i)
726  qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
727 
728  Py_BEGIN_ALLOW_THREADS
729  rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
730  Py_END_ALLOW_THREADS
731 
732  for (i = 0; i < count; ++i)
733  PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
734 
735  free(topics);
736  free(qoss);
737 
738  if (rc == MQTTASYNC_SUCCESS)
739  return Py_BuildValue("iO", rc, qosList);
740  else
741  return Py_BuildValue("i", rc);
742 }
743 
744 
745 static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
746 {
747  MQTTAsync c;
749  PyObject *pyoptions = NULL;
750  char* topic;
751  int rc;
752 
753  if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
754  return NULL;
755 
756  if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
757  return NULL;
758 
759  Py_BEGIN_ALLOW_THREADS
760  rc = MQTTAsync_unsubscribe(c, topic, &response);
761  Py_END_ALLOW_THREADS
762  return Py_BuildValue("i", rc);
763 }
764 
765 
766 static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
767 {
768  MQTTAsync c;
770  PyObject* topicList;
771  PyObject *pyoptions = NULL;
772 
773  int count;
774  char** topics;
775 
776  int i, rc = 0;
777 
778  if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
779  return NULL;
780 
781  if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
782  return NULL;
783 
784  if (!PySequence_Check(topicList))
785  {
786  PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
787  return NULL;
788  }
789 
790  count = PySequence_Length(topicList);
791  topics = malloc(count * sizeof(char*));
792  for (i = 0; i < count; ++i)
793  topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
794 
795  Py_BEGIN_ALLOW_THREADS
796  rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
797  Py_END_ALLOW_THREADS
798 
799  free(topics);
800 
801  return Py_BuildValue("i", rc);
802 }
803 
804 
805 static PyObject* mqttv3_send(PyObject* self, PyObject *args)
806 {
807  MQTTAsync c;
808  char* destinationName;
809  int payloadlen;
810  void* payload;
811  int qos = 0;
812  int retained = 0;
814  PyObject *pyoptions = NULL;
815  int rc;
816 
817  if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
818  &payloadlen, &qos, &retained, &pyoptions))
819  return NULL;
820 
821  if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
822  return NULL;
823 
824  Py_BEGIN_ALLOW_THREADS
825  rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
826  Py_END_ALLOW_THREADS
827 
828  if (rc == MQTTASYNC_SUCCESS && qos > 0)
829  return Py_BuildValue("ii", rc, response);
830  else
831  return Py_BuildValue("i", rc);
832 }
833 
834 
835 static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
836 {
837  MQTTAsync c;
838  char* destinationName;
839  PyObject *message, *temp;
842  PyObject *pyoptions = NULL;
843  int rc;
844 
845  if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
846  return NULL;
847 
848  if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
849  return NULL;
850 
851  if (!PyDict_Check(message))
852  {
853  PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
854  return NULL;
855  }
856 
857  if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
858  {
859  PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
860  return NULL;
861  }
862 
863  if (PyString_Check(temp))
864  PyString_AsStringAndSize(temp, (char**) &msg.payload,
865  (Py_ssize_t*) &msg.payloadlen);
866  else
867  {
868  PyErr_SetString(PyExc_TypeError, "payload value must be string");
869  return NULL;
870  }
871 
872  if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
873  msg.qos = (int) PyInt_AsLong(temp);
874 
875  if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
876  msg.retained = (int) PyInt_AsLong(temp);
877 
878  Py_BEGIN_ALLOW_THREADS
879  rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
880  Py_END_ALLOW_THREADS
881 
882  if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
883  return Py_BuildValue("ii", rc, response);
884  else
885  return Py_BuildValue("i", rc);
886 }
887 
888 
889 static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
890 {
891  MQTTAsync c;
892  unsigned long timeout = 1000L;
893  MQTTAsync_token dt;
894  int rc;
895 
896  if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
897  return NULL;
898 
899  Py_BEGIN_ALLOW_THREADS
900  rc = MQTTAsync_waitForCompletion(c, dt, timeout);
901  Py_END_ALLOW_THREADS
902 
903  return Py_BuildValue("i", rc);
904 }
905 
906 
907 static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
908 {
909  MQTTAsync c;
910  MQTTAsync_token* tokens;
911  int rc;
912 
913  if (!PyArg_ParseTuple(args, "k", &c))
914  return NULL;
915 
916  Py_BEGIN_ALLOW_THREADS
917  rc = MQTTAsync_getPendingTokens(c, &tokens);
918  Py_END_ALLOW_THREADS
919 
920  if (rc == MQTTASYNC_SUCCESS)
921  {
922  int i = 0;
923  PyObject* dts = PyList_New(0);
924 
925  while (tokens[i] != -1)
926  PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
927 
928  return Py_BuildValue("iO", rc, dts);
929  }
930  else
931  return Py_BuildValue("i", rc);
932 }
933 
934 
935 static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
936 {
937  MQTTAsync c;
938  ListElement* temp = NULL;
939 
940  if (!PyArg_ParseTuple(args, "k", &c))
941  return NULL;
942 
943  if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
944  {
945  ListDetach(callbacks, temp->content);
946  free(temp->content);
947  }
948 
949  if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
950  {
951  ListDetach(connected_callbacks, temp->content);
952  free(temp->content);
953  }
954 
955  MQTTAsync_destroy(&c);
956 
957  Py_INCREF(Py_None);
958  return Py_None;
959 }
960 
961 
962 static PyMethodDef MqttV3Methods[] =
963  {
964  { "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
965  { "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
966  "Sets the callback functions for a particular client." },
967  { "setconnected", mqttv3_setconnected, METH_VARARGS,
968  "Sets the connected callback function for a particular client." },
969  { "connect", mqttv3_connect, METH_VARARGS,
970  "Connects to a server using the specified options." },
971  { "disconnect", mqttv3_disconnect, METH_VARARGS,
972  "Disconnects from a server." },
973  { "isConnected", mqttv3_isConnected, METH_VARARGS,
974  "Determines if this client is currently connected to the server." },
975  { "subscribe", mqttv3_subscribe, METH_VARARGS,
976  "Subscribe to the given topic." },
977  { "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
978  "Subscribe to the given topics." },
979  { "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
980  "Unsubscribe from the given topic." },
981  { "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
982  "Unsubscribe from the given topics." },
983  { "send", mqttv3_send, METH_VARARGS,
984  "Publish a message to the given topic." },
985  { "sendMessage", mqttv3_sendMessage, METH_VARARGS,
986  "Publish a message to the given topic." },
987  { "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
988  "Waits for the completion of the delivery of the message represented by a delivery token." },
989  { "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
990  "Returns the tokens pending of completion." },
991  { "destroy", mqttv3_destroy, METH_VARARGS,
992  "Free memory allocated to a MQTT client. It is the opposite to create." },
993  { NULL, NULL, 0, NULL } /* Sentinel */
994  };
995 
996 
997 PyMODINIT_FUNC initpaho_mqtt3a(void)
998 {
999  PyObject *m;
1000 
1001  PyEval_InitThreads();
1002 
1003  callbacks = ListInitialize();
1004  connected_callbacks = ListInitialize();
1005 
1006  m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
1007  if (m == NULL)
1008  return;
1009 
1010  MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
1011  Py_INCREF(MqttV3Error);
1012  PyModule_AddObject(m, "error", MqttV3Error);
1013 
1014  PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
1015  PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
1016  PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
1017  PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
1018  PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
1019  PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
1020  PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
1021  PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
1022  PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
1023  PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
1024  PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
1026 }
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
void onSuccess(void *context, MQTTAsync_successData *response)
#define MQTTASYNC_NULL_PARAMETER
Definition: MQTTAsync.h:140
static PyObject * mqttv3_setcallbacks(PyObject *self, PyObject *args)
int MQTTAsync_createWithOptions(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTAsync_createOptions *options)
Definition: MQTTAsync.c:575
string topic
Definition: test2.py:8
PyObject * onSuccess
static List * connected_callbacks
union MQTTAsync_successData::@46 alt
const char * message
Definition: MQTTAsync.h:996
const char * topicName
Definition: MQTTAsync.h:994
#define MQTTCLIENT_PERSISTENCE_ERROR
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
#define MQTTASYNC_FAILURE
Definition: MQTTAsync.h:118
static PyObject * mqttv3_unsubscribeMany(PyObject *self, PyObject *args)
void connected(void *context, char *cause)
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
#define MQTTASYNC_BAD_UTF8_STRING
Definition: MQTTAsync.h:136
#define MQTTCLIENT_PERSISTENCE_USER
void * MQTTAsync
Definition: MQTTAsync.h:239
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
#define malloc(x)
Definition: Heap.h:41
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
#define MQTTASYNC_DISCONNECTED
Definition: MQTTAsync.h:127
static PyObject * mqttv3_subscribe(PyObject *self, PyObject *args)
static PyObject * mqttv3_destroy(PyObject *self, PyObject *args)
int MQTTAsync_setConnected(MQTTAsync handle, void *context, MQTTAsync_connected *connected)
Definition: MQTTAsync.c:3178
int MQTTAsync_unsubscribe(MQTTAsync handle, const char *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4209
static PyObject * mqttv3_connect(PyObject *self, PyObject *args)
struct MQTTAsync_successData::@46::@48 connect
int ListDetach(List *aList, void *content)
Definition: LinkedList.c:245
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
#define free(x)
Definition: Heap.h:55
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
void deliveryComplete(void *context, MQTTAsync_token dt)
static PyObject * mqttv3_disconnect(PyObject *self, PyObject *args)
#define MQTTAsync_willOptions_initializer
Definition: MQTTAsync.h:1014
constexpr size_t count()
Definition: core.h:960
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
Definition: MQTTAsync.c:4737
static PyObject * mqttv3_getPendingTokens(PyObject *self, PyObject *args)
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char *const *topic, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4131
static PyObject * mqttv3_send(PyObject *self, PyObject *args)
msgTypes
Definition: MQTTPacket.h:46
int qos
Definition: test6.c:56
MQTTAsync_token token
Definition: MQTTAsync.h:549
#define MQTTAsync_createOptions_initializer
Definition: MQTTAsync.h:965
int MQTTAsync_isConnected(MQTTAsync handle)
Definition: MQTTAsync.c:3932
static PyObject * mqttv3_create(PyObject *self, PyObject *args)
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char *const *topic, int *qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4004
int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions *responseOptions, enum msgTypes msgType)
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
int MQTTAsync_token
Definition: MQTTAsync.h:249
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
int connectedCompare(void *a, void *b)
MQTTAsync_willOptions * will
Definition: MQTTAsync.h:1214
static PyObject * mqttv3_subscribeMany(PyObject *self, PyObject *args)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen, const void *payload, int qos, int retained, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4230
void MQTTAsync_onSuccess(void *context, MQTTAsync_successData *response)
Definition: MQTTAsync.h:631
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
static PyObject * mqttv3_sendMessage(PyObject *self, PyObject *args)
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
static PyMethodDef MqttV3Methods[]
int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const MQTTAsync_message *message, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4328
MQTTClient c
Definition: test10.c:1656
PyObject * context
int retained
Definition: test6.c:57
dictionary context
Definition: test2.py:57
PyObject * context
static PyObject * mqttv3_isConnected(PyObject *self, PyObject *args)
struct MQTTAsync_successData::@46::@47 pub
#define MQTTCLIENT_PERSISTENCE_NONE
PyObject * onFailure
void connectionLost(void *context, char *cause)
static PyObject * mqttv3_waitForCompletion(PyObject *self, PyObject *args)
void MQTTAsync_onFailure(void *context, MQTTAsync_failureData *response)
Definition: MQTTAsync.h:662
List * ListInitialize(void)
Definition: LinkedList.c:52
PyMODINIT_FUNC initpaho_mqtt3a(void)
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
enum msgTypes msgType
#define MQTTASYNC_TOPICNAME_TRUNCATED
Definition: MQTTAsync.h:146
#define MQTTAsync_message_initializer
Definition: MQTTAsync.h:319
char * topics[]
enum MQTTReasonCodes rc
Definition: test10.c:1112
static PyObject * mqttv3_unsubscribe(PyObject *self, PyObject *args)
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
int clientCompare(void *a, void *b)
#define MQTTASYNC_MAX_MESSAGES_INFLIGHT
Definition: MQTTAsync.h:132
void onFailure(void *context, MQTTAsync_failureData *response)
PyObject * context
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
static PyObject * mqttv3_setconnected(PyObject *self, PyObject *args)
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
Definition: MQTTAsync.c:4848
static PyObject * MqttV3Error
static List * callbacks
struct Options options


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09