events.py
Go to the documentation of this file.
00001 import copy
00002 
00003 from opcua import ua
00004 import opcua
00005 from opcua.ua.uaerrors import UaError
00006 from opcua.common import ua_utils
00007 
00008 
00009 class Event(object):
00010     """
00011     OPC UA Event object.
00012     This is class in inherited by the common event objects such as BaseEvent,
00013     other auto standard events and custom events
00014     Events are used to trigger events on server side and are
00015     sent to clients for every events from server
00016 
00017     Developper Warning:
00018     On server side the data type of attributes should be known, thus
00019     add properties using the add_property method!!!
00020     """
00021 
00022     def __init__(self):
00023         self.server_handle = None
00024         self.select_clauses = None
00025         self.event_fields = None
00026         self.data_types = {}
00027         # save current attributes
00028         self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
00029 
00030     def __str__(self):
00031         return "{0}({1})".format(
00032             self.__class__.__name__, 
00033             [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
00034     __repr__ = __str__
00035 
00036     def add_property(self, name, val, datatype):
00037         """
00038         Add a property to event and tore its data type
00039         """
00040         setattr(self, name, val)
00041         self.data_types[name] = datatype
00042 
00043     def get_event_props_as_fields_dict(self):
00044         """
00045         convert all properties of the Event class to a dict of variants
00046         """
00047         field_vars = {}
00048         for key, value in vars(self).items():
00049             if not key.startswith("__") and key not in self.internal_properties:
00050                 field_vars[key] = ua.Variant(value, self.data_types[key])
00051         return field_vars
00052 
00053     @staticmethod
00054     def from_field_dict(fields):
00055         """
00056         Create an Event object from a dict of name and variants
00057         """
00058         ev = Event()
00059         for k, v in fields.items():
00060             ev.add_property(k, v.Value, v.VariantType)
00061         return ev
00062 
00063     def to_event_fields_using_subscription_fields(self, select_clauses):
00064         """
00065         Using a new select_clauses and the original select_clauses
00066         used during subscription, return a field list 
00067         """
00068         fields = []
00069         for sattr in select_clauses:
00070             for idx, o_sattr in enumerate(self.select_clauses):
00071                 if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
00072                     fields.append(self.event_fields[idx])
00073                     break
00074         return fields
00075 
00076     def to_event_fields(self, select_clauses):
00077         """
00078         return a field list using a select clause and the object properties
00079         """
00080         fields = []
00081         for sattr in select_clauses:
00082             if not sattr.BrowsePath:
00083                 name = ua.AttributeIds(sattr.AttributeId).name
00084             else:
00085                 name = sattr.BrowsePath[0].Name
00086             try:
00087                 val = getattr(self, name)
00088             except AttributeError:
00089                 field = ua.Variant(None)
00090             else:
00091                 field = ua.Variant(copy.deepcopy(val), self.data_types[name])
00092             fields.append(field)
00093         return fields
00094 
00095     @staticmethod
00096     def from_event_fields(select_clauses, fields):
00097         """
00098         Instantiate an Event object from a select_clauses and fields
00099         """
00100         ev = Event()
00101         ev.select_clauses = select_clauses
00102         ev.event_fields = fields
00103         for idx, sattr in enumerate(select_clauses):
00104             if len(sattr.BrowsePath) == 0:
00105                 name = sattr.AttributeId.name
00106             else:
00107                 name = sattr.BrowsePath[0].Name
00108             ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
00109         return ev
00110 
00111 
00112 def get_filter_from_event_type(eventtypes):
00113     evfilter = ua.EventFilter()
00114     evfilter.SelectClauses = select_clauses_from_evtype(eventtypes)
00115     evfilter.WhereClause = where_clause_from_evtype(eventtypes)
00116     return evfilter
00117 
00118 
00119 def select_clauses_from_evtype(evtypes):
00120     clauses = []
00121 
00122     selected_paths = []
00123     for evtype in evtypes:
00124         for prop in get_event_properties_from_type_node(evtype):
00125             if prop.get_browse_name() not in selected_paths:
00126                 op = ua.SimpleAttributeOperand()
00127                 op.AttributeId = ua.AttributeIds.Value
00128                 op.BrowsePath = [prop.get_browse_name()]
00129                 clauses.append(op)
00130                 selected_paths.append(prop.get_browse_name())
00131     return clauses
00132 
00133 
00134 def where_clause_from_evtype(evtypes):
00135     cf = ua.ContentFilter()
00136     el = ua.ContentFilterElement()
00137     
00138     # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
00139     # Create a clause where the generate event type property EventType
00140     # must be a subtype of events in evtypes argument
00141 
00142     # the first operand is the attribute event type
00143     op = ua.SimpleAttributeOperand()
00144     # op.TypeDefinitionId = evtype.nodeid
00145     op.BrowsePath.append(ua.QualifiedName("EventType", 0))
00146     op.AttributeId = ua.AttributeIds.Value
00147     el.FilterOperands.append(op)
00148 
00149     # now create a list of all subtypes we want to accept
00150     subtypes = []
00151     for evtype in evtypes:
00152         subtypes += [st.nodeid for st in ua_utils.get_node_subtypes(evtype)]
00153     subtypes = list(set(subtypes))  # remove duplicates
00154     for subtypeid in subtypes:
00155         op = ua.LiteralOperand()
00156         op.Value = ua.Variant(subtypeid)
00157         el.FilterOperands.append(op)
00158 
00159     el.FilterOperator = ua.FilterOperator.InList
00160     cf.Elements.append(el)
00161 
00162     return cf
00163 
00164 
00165 def get_event_properties_from_type_node(node):
00166     properties = []
00167     curr_node = node
00168 
00169     while True:
00170         properties.extend(curr_node.get_properties())
00171 
00172         if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
00173             break
00174 
00175         parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
00176         if len(parents) != 1:  # Something went wrong
00177             return None
00178         curr_node = parents[0]
00179 
00180     return properties
00181 
00182 
00183 def get_event_obj_from_type_node(node):
00184     """
00185     return an Event object from an event type node
00186     """
00187     if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
00188         return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
00189     else:
00190         parent_identifier, parent_eventtype = _find_parent_eventtype(node)
00191 
00192         class CustomEvent(parent_eventtype):
00193 
00194             def __init__(self):
00195                 parent_eventtype.__init__(self)
00196                 self.EventType = node.nodeid
00197                 curr_node = node
00198 
00199                 while curr_node.nodeid.Identifier != parent_identifier:
00200                     for prop in curr_node.get_properties():
00201                         name = prop.get_browse_name().Name
00202                         val = prop.get_data_value()
00203                         self.add_property(name, val.Value.Value, val.Value.VariantType)
00204                     parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
00205 
00206                     if len(parents) != 1:  # Something went wrong
00207                         raise UaError("Parent of event type could notbe found")
00208                     curr_node = parents[0]
00209 
00210                 self._freeze = True
00211 
00212     return CustomEvent()
00213 
00214 
00215 def _find_parent_eventtype(node):
00216     """
00217     """
00218     parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
00219 
00220     if len(parents) != 1:   # Something went wrong
00221         raise UaError("Parent of event type could notbe found")
00222     if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
00223         return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
00224     else:
00225         return _find_parent_eventtype(parents[0])
00226 


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23