server-example.py
Go to the documentation of this file.
1 import sys
2 sys.path.insert(0, "..")
3 import logging
4 from datetime import datetime
5 
6 try:
7  from IPython import embed
8 except ImportError:
9  import code
10 
11  def embed():
12  vars = globals()
13  vars.update(locals())
14  shell = code.InteractiveConsole(vars)
15  shell.interact()
16 
17 
18 from opcua import ua, uamethod, Server
19 
20 
21 class SubHandler(object):
22 
23  """
24  Subscription Handler. To receive events from server for a subscription
25  """
26 
27  def datachange_notification(self, node, val, data):
28  print("Python: New data change event", node, val)
29 
30  def event_notification(self, event):
31  print("Python: New event", event)
32 
33 
34 # method to be exposed through server
35 
36 def func(parent, variant):
37  ret = False
38  if variant.Value % 2 == 0:
39  ret = True
40  return [ua.Variant(ret, ua.VariantType.Boolean)]
41 
42 
43 # method to be exposed through server
44 # uses a decorator to automatically convert to and from variants
45 
46 @uamethod
47 def multiply(parent, x, y):
48  print("multiply method call with parameters: ", x, y)
49  return x * y
50 
51 
52 if __name__ == "__main__":
53  # optional: setup logging
54  logging.basicConfig(level=logging.WARN)
55  #logger = logging.getLogger("opcua.address_space")
56  # logger.setLevel(logging.DEBUG)
57  #logger = logging.getLogger("opcua.internal_server")
58  # logger.setLevel(logging.DEBUG)
59  #logger = logging.getLogger("opcua.binary_server_asyncio")
60  # logger.setLevel(logging.DEBUG)
61  #logger = logging.getLogger("opcua.uaprocessor")
62  # logger.setLevel(logging.DEBUG)
63 
64  # now setup our server
65  server = Server()
66  #server.disable_clock()
67  #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
68  server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
69  server.set_server_name("FreeOpcUa Example Server")
70 
71  # setup our own namespace
72  uri = "http://examples.freeopcua.github.io"
73  idx = server.register_namespace(uri)
74 
75  # create a new node type we can instantiate in our address space
76  dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
77  dev.add_variable(0, "sensor1", 1.0)
78  dev.add_property(0, "device_id", "0340")
79  ctrl = dev.add_object(0, "controller")
80  ctrl.add_property(0, "state", "Idle")
81 
82  # populating our address space
83 
84  # First a folder to organise our nodes
85  myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
86  # instanciate one instance of our device
87  mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
88  mydevice_var = mydevice.get_child(["0:controller", "0:state"]) # get proxy to our device state variable
89  # create directly some objects and variables
90  myobj = server.nodes.objects.add_object(idx, "MyObject")
91  myvar = myobj.add_variable(idx, "MyVariable", 6.7)
92  myvar.set_writable() # Set MyVariable to be writable by clients
93  mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
94  mystringvar.set_writable() # Set MyVariable to be writable by clients
95  mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.now())
96  mydtvar.set_writable() # Set MyVariable to be writable by clients
97  myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
98  myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
99  myprop = myobj.add_property(idx, "myproperty", "I am a property")
100  mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
101  multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
102 
103  # import some nodes from xml
104  server.import_xml("custom_nodes.xml")
105 
106  # creating a default event object
107  # The event object automatically will have members for all events properties
108  # you probably want to create a custom event type, see other examples
109  myevgen = server.get_event_generator()
110  myevgen.event.Severity = 300
111 
112  # starting!
113  server.start()
114  print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
115  try:
116  # enable following if you want to subscribe to nodes on server side
117  #handler = SubHandler()
118  #sub = server.create_subscription(500, handler)
119  #handle = sub.subscribe_data_change(myvar)
120  # trigger event, all subscribed clients wil receive it
121  mydevice_var.set_value("Running")
122  myevgen.trigger(message="This is BaseEvent")
123 
124  embed()
125  finally:
126  server.stop()
def datachange_notification(self, node, val, data)
def multiply(parent, x, y)
def func(parent, variant)
def event_notification(self, event)


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Tue Jan 19 2021 03:12:44