Go to the documentation of this file.00001 import sys
00002 sys.path.insert(0, "..")
00003 import logging
00004
00005 from opcua import Client
00006 from opcua import ua
00007 from IPython import embed
00008
00009
00010 class SubHandler(object):
00011
00012 """
00013 Subscription Handler. To receive events from server for a subscription
00014 """
00015
00016 def datachange_notification(self, node, val, data):
00017 print("Python: New data change event", node, val)
00018
00019 def event_notification(self, event):
00020 print("Python: New event", event.EventType)
00021
00022
00023
00024
00025 if __name__ == "__main__":
00026
00027 logging.basicConfig(level=logging.WARN)
00028 client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
00029
00030 try:
00031 client.connect()
00032 root = client.get_root_node()
00033 print("Root is", root)
00034
00035 handler = SubHandler()
00036 sub = client.create_subscription(500, handler)
00037 handle = sub.subscribe_events(evtype=2788)
00038
00039 cond = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
00040 cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
00041
00042 embed()
00043 finally:
00044 client.disconnect()