Go to the documentation of this file.00001 import sys
00002 sys.path.insert(0, "..")
00003
00004 try:
00005 from IPython import embed
00006 except ImportError:
00007 import code
00008
00009 def embed():
00010 vars = globals()
00011 vars.update(locals())
00012 shell = code.InteractiveConsole(vars)
00013 shell.interact()
00014
00015
00016 from opcua import Client
00017
00018
00019 class SubHandler(object):
00020
00021 """
00022 Subscription Handler. To receive events from server for a subscription
00023 data_change and event methods are called directly from receiving thread.
00024 Do not do expensive, slow or network operation there. Create another
00025 thread if you need to do such a thing
00026 """
00027 def event_notification(self, event):
00028 print("New event recived: ", event)
00029
00030
00031 if __name__ == "__main__":
00032
00033 client = Client("opc.tcp://localhost:4840/freeopcua/server/")
00034
00035 try:
00036 client.connect()
00037
00038
00039 root = client.get_root_node()
00040 print("Objects node is: ", root)
00041
00042
00043 obj = root.get_child(["0:Objects", "2:MyObject"])
00044 print("MyObject is: ", obj)
00045
00046 myevent = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"])
00047 print("MyFirstEventType is: ", myevent)
00048
00049 msclt = SubHandler()
00050 sub = client.create_subscription(100, msclt)
00051 handle = sub.subscribe_events(obj, myevent)
00052
00053 embed()
00054 sub.unsubscribe(handle)
00055 sub.delete()
00056 finally:
00057 client.disconnect()