42 publish_msg_fields = [(
True,
"topic", string_types)]
48 Capability.__init__(self, protocol)
51 protocol.register_operation(
"publish", self.
publish)
56 if protocol.parameters
and "unregister_timeout" in protocol.parameters:
57 manager.unregister_timeout = protocol.parameters.get(
"unregister_timeout")
62 topic = message[
"topic"]
63 latch = message.get(
"latch",
False)
64 queue_size = message.get(
"queue_size", 100)
66 if Publish.topics_glob
is not None and Publish.topics_glob:
67 self.protocol.log(
"debug",
"Topic security glob enabled, checking topic: " + topic)
69 for glob
in Publish.topics_glob:
70 if (fnmatch.fnmatch(topic, glob)):
71 self.protocol.log(
"debug",
"Found match with glob " + glob +
", continuing publish...")
75 self.protocol.log(
"warn",
"No match found for topic, cancelling publish to: " + topic)
78 self.protocol.log(
"debug",
"No topic security glob, not checking publish.")
81 client_id = self.protocol.client_id
82 manager.register(client_id, topic, latch=latch, queue_size=queue_size)
86 msg = message.get(
"msg", {})
89 manager.publish(client_id, topic, msg, latch=latch, queue_size=queue_size)
92 client_id = self.protocol.client_id
94 manager.unregister(client_id, topic)
95 self._published.clear()
def __init__(self, protocol)
def basic_type_check(self, msg, types_info)
def publish(self, message)