client-events.py
Go to the documentation of this file.
1 import sys
2 sys.path.insert(0, "..")
3 
4 try:
5  from IPython import embed
6 except ImportError:
7  import code
8 
9  def embed():
10  vars = globals()
11  vars.update(locals())
12  shell = code.InteractiveConsole(vars)
13  shell.interact()
14 
15 
16 from opcua import Client
17 
18 
19 class SubHandler(object):
20 
21  """
22  Subscription Handler. To receive events from server for a subscription
23  data_change and event methods are called directly from receiving thread.
24  Do not do expensive, slow or network operation there. Create another
25  thread if you need to do such a thing
26  """
27  def event_notification(self, event):
28  print("New event recived: ", event)
29 
30 
31 if __name__ == "__main__":
32 
33  client = Client("opc.tcp://localhost:4840/freeopcua/server/")
34  # client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
35  try:
36  client.connect()
37 
38  # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
39  root = client.get_root_node()
40  print("Objects node is: ", root)
41 
42  # Now getting a variable node using its browse path
43  obj = root.get_child(["0:Objects", "2:MyObject"])
44  print("MyObject is: ", obj)
45 
46  myevent = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"])
47  print("MyFirstEventType is: ", myevent)
48 
49  msclt = SubHandler()
50  sub = client.create_subscription(100, msclt)
51  handle = sub.subscribe_events(obj, myevent)
52 
53  embed()
54  sub.unsubscribe(handle)
55  sub.delete()
56  finally:
57  client.disconnect()
def event_notification(self, event)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:43