mqtt_client.py
Go to the documentation of this file.
1 from typing import Dict, Callable
2 
3 import paho.mqtt.client as mqtt
4 
5 
6 def default_mqtt_client_factory(params: Dict) -> mqtt.Client:
7  """ MQTT Client factory """
8  # create client
9  client_params = params.get('client', {})
10  client = mqtt.Client(**client_params)
11 
12  # configure tls
13  tls_params = params.get('tls', {})
14  if tls_params:
15  tls_insecure = tls_params.pop('tls_insecure', False)
16  client.tls_set(**tls_params)
17  client.tls_insecure_set(tls_insecure)
18 
19  # configure username and password
20  account_params = params.get('account', {})
21  if account_params:
22  client.username_pw_set(**account_params)
23 
24  # configure message params
25  message_params = params.get('message', {})
26  if message_params:
27  inflight = message_params.get('max_inflight_messages')
28  if inflight is not None:
29  client.max_inflight_messages_set(inflight)
30  queue_size = message_params.get('max_queued_messages')
31  if queue_size is not None:
32  client.max_queued_messages_set(queue_size)
33  retry = message_params.get('message_retry')
34  if retry is not None:
35  client.message_retry_set(retry)
36 
37  # configure userdata
38  userdata = params.get('userdata', {})
39  if userdata:
40  client.user_data_set(userdata)
41 
42  # configure will params
43  will_params = params.get('will', {})
44  if will_params:
45  client.will_set(**will_params)
46 
47  return client
48 
49 
50 def create_private_path_extractor(mqtt_private_path: str) -> Callable[[str], str]:
51  def extractor(topic_path):
52  if topic_path.startswith('~/'):
53  return '{}/{}'.format(mqtt_private_path, topic_path[2:])
54  return topic_path
55  return extractor
56 
57 
58 __all__ = ['default_mqtt_client_factory', 'create_private_path_extractor']


mqtt_bridge
Author(s): Junya Hayashi
autogenerated on Fri Jan 27 2023 03:35:02