server-example.py
Go to the documentation of this file.
00001 import sys
00002 sys.path.insert(0, "..")
00003 import logging
00004 from datetime import datetime
00005 
00006 try:
00007     from IPython import embed
00008 except ImportError:
00009     import code
00010 
00011     def embed():
00012         vars = globals()
00013         vars.update(locals())
00014         shell = code.InteractiveConsole(vars)
00015         shell.interact()
00016 
00017 
00018 from opcua import ua, uamethod, Server
00019 
00020 
00021 class SubHandler(object):
00022 
00023     """
00024     Subscription Handler. To receive events from server for a subscription
00025     """
00026 
00027     def datachange_notification(self, node, val, data):
00028         print("Python: New data change event", node, val)
00029 
00030     def event_notification(self, event):
00031         print("Python: New event", event)
00032 
00033 
00034 # method to be exposed through server
00035 
00036 def func(parent, variant):
00037     ret = False
00038     if variant.Value % 2 == 0:
00039         ret = True
00040     return [ua.Variant(ret, ua.VariantType.Boolean)]
00041 
00042 
00043 # method to be exposed through server
00044 # uses a decorator to automatically convert to and from variants
00045 
00046 @uamethod
00047 def multiply(parent, x, y):
00048     print("multiply method call with parameters: ", x, y)
00049     return x * y
00050 
00051 
00052 if __name__ == "__main__":
00053     # optional: setup logging
00054     logging.basicConfig(level=logging.WARN)
00055     #logger = logging.getLogger("opcua.address_space")
00056     # logger.setLevel(logging.DEBUG)
00057     #logger = logging.getLogger("opcua.internal_server")
00058     # logger.setLevel(logging.DEBUG)
00059     #logger = logging.getLogger("opcua.binary_server_asyncio")
00060     # logger.setLevel(logging.DEBUG)
00061     #logger = logging.getLogger("opcua.uaprocessor")
00062     # logger.setLevel(logging.DEBUG)
00063 
00064     # now setup our server
00065     server = Server()
00066     #server.disable_clock()
00067     #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
00068     server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
00069     server.set_server_name("FreeOpcUa Example Server")
00070 
00071     # setup our own namespace
00072     uri = "http://examples.freeopcua.github.io"
00073     idx = server.register_namespace(uri)
00074 
00075     # create a new node type we can instantiate in our address space
00076     dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
00077     dev.add_variable(0, "sensor1", 1.0)
00078     dev.add_property(0, "device_id", "0340")
00079     ctrl = dev.add_object(0, "controller")
00080     ctrl.add_property(0, "state", "Idle")
00081 
00082     # populating our address space
00083 
00084     # First a folder to organise our nodes
00085     myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
00086     # instanciate one instance of our device
00087     mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
00088     mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable 
00089     # create directly some objects and variables
00090     myobj = server.nodes.objects.add_object(idx, "MyObject")
00091     myvar = myobj.add_variable(idx, "MyVariable", 6.7)
00092     myvar.set_writable()    # Set MyVariable to be writable by clients
00093     mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
00094     mystringvar.set_writable()    # Set MyVariable to be writable by clients
00095     mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.now())
00096     mydtvar.set_writable()    # Set MyVariable to be writable by clients
00097     myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
00098     myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
00099     myprop = myobj.add_property(idx, "myproperty", "I am a property")
00100     mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
00101     multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
00102 
00103     # import some nodes from xml
00104     server.import_xml("custom_nodes.xml")
00105 
00106     # creating a default event object
00107     # The event object automatically will have members for all events properties
00108     # you probably want to create a custom event type, see other examples
00109     myevgen = server.get_event_generator()
00110     myevgen.event.Severity = 300
00111 
00112     # starting!
00113     server.start()
00114     print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
00115     try:
00116         # enable following if you want to subscribe to nodes on server side
00117         #handler = SubHandler()
00118         #sub = server.create_subscription(500, handler)
00119         #handle = sub.subscribe_data_change(myvar)
00120         # trigger event, all subscribed clients wil receive it
00121         mydevice_var.set_value("Running")
00122         myevgen.trigger(message="This is BaseEvent")
00123 
00124         embed()
00125     finally:
00126         server.stop()


ros_opcua_impl_python_opcua
Author(s): Denis Štogl , Daniel Draper
autogenerated on Sat Jun 8 2019 18:26:23