12 This is class in inherited by the common event objects such as BaseEvent, 13 other auto standard events and custom events 14 Events are used to trigger events on server side and are 15 sent to clients for every events from server 18 On server side the data type of attributes should be known, thus 19 add properties using the add_property method!!! 31 return "{0}({1})".format(
32 self.__class__.__name__,
33 [str(k) +
":" + str(v)
for k, v
in self.__dict__.items()
if k
not in self.
internal_properties])
38 Add a property to event and tore its data type 40 setattr(self, name, val)
45 convert all properties of the Event class to a dict of variants 48 for key, value
in vars(self).items():
56 Create an Event object from a dict of name and variants 59 for k, v
in fields.items():
60 ev.add_property(k, v.Value, v.VariantType)
65 Using a new select_clauses and the original select_clauses 66 used during subscription, return a field list 69 for sattr
in select_clauses:
71 if sattr.BrowsePath == o_sattr.BrowsePath
and sattr.AttributeId == o_sattr.AttributeId:
78 return a field list using a select clause and the object properties 81 for sattr
in select_clauses:
82 if not sattr.BrowsePath:
85 name = sattr.BrowsePath[0].Name
87 val = getattr(self, name)
88 except AttributeError:
98 Instantiate an Event object from a select_clauses and fields 101 ev.select_clauses = select_clauses
102 ev.event_fields = fields
103 for idx, sattr
in enumerate(select_clauses):
104 if len(sattr.BrowsePath) == 0:
105 name = sattr.AttributeId.name
107 name = sattr.BrowsePath[0].Name
108 ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
123 for evtype
in evtypes:
125 if prop.get_browse_name()
not in selected_paths:
127 op.AttributeId = ua.AttributeIds.Value
128 op.BrowsePath = [prop.get_browse_name()]
130 selected_paths.append(prop.get_browse_name())
146 op.AttributeId = ua.AttributeIds.Value
147 el.FilterOperands.append(op)
151 for evtype
in evtypes:
152 subtypes += [st.nodeid
for st
in ua_utils.get_node_subtypes(evtype)]
153 subtypes = list(set(subtypes))
154 for subtypeid
in subtypes:
157 el.FilterOperands.append(op)
159 el.FilterOperator = ua.FilterOperator.InList
160 cf.Elements.append(el)
170 properties.extend(curr_node.get_properties())
172 if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
175 parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=
True)
176 if len(parents) != 1:
178 curr_node = parents[0]
185 return an Event object from an event type node 187 if node.nodeid.Identifier
in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
188 return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
192 class CustomEvent(parent_eventtype):
195 parent_eventtype.__init__(self)
196 self.EventType = node.nodeid
199 while curr_node.nodeid.Identifier != parent_identifier:
200 for prop
in curr_node.get_properties():
201 name = prop.get_browse_name().Name
202 val = prop.get_data_value()
203 self.add_property(name, val.Value.Value, val.Value.VariantType)
204 parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=
True)
206 if len(parents) != 1:
207 raise UaError(
"Parent of event type could notbe found")
208 curr_node = parents[0]
218 parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=
True)
220 if len(parents) != 1:
221 raise UaError(
"Parent of event type could notbe found")
222 if parents[0].nodeid.Identifier
in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
223 return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
def _find_parent_eventtype(node)
def select_clauses_from_evtype(evtypes)
def from_field_dict(fields)
def from_event_fields(select_clauses, fields)
def where_clause_from_evtype(evtypes)
def get_filter_from_event_type(eventtypes)
def get_event_props_as_fields_dict(self)
def get_event_properties_from_type_node(node)
def get_event_obj_from_type_node(node)
def add_property(self, name, val, datatype)
def to_event_fields_using_subscription_fields(self, select_clauses)
def to_event_fields(self, select_clauses)