server.py
Go to the documentation of this file.
00001 import datetime
00002 import time
00003 import sys
00004 sys.path.append(".")
00005 
00006 from IPython import embed
00007 import opcua
00008 
00009 class SubHandler(opcua.SubscriptionHandler):
00010     def data_change(self, handle, node, val, attr):
00011         print("New data change event", handle, node, val, attr)
00012 
00013     def event(self, handle, event):
00014         print("Python: New event", handle, event)
00015 
00016 
00017 if __name__ == "__main__":
00018     # create our server, the argument is for debugging
00019     server = opcua.Server(True)
00020     server.set_endpoint("opc.tcp://localhost:4841/freeopcua/server/")
00021     server.set_server_name("FreeOpcUa Example Server")
00022 
00023     # start the server
00024     server.start()
00025 
00026     try:
00027         # setup our own namespace
00028         uri = "http://examples.freeopcua.github.io"
00029         idx = server.register_namespace(uri)
00030 
00031         # get Objects node, this is where we should put our custom stuff 
00032         objects = server.get_objects_node()
00033         print("I got objects folder: ", objects)
00034 
00035         # now adding some object to our addresse space 
00036         myobject = objects.add_object(idx, "NewObject")
00037         myvar = myobject.add_variable(idx, "MyVariable", [16, 56])
00038         myprop = myobject.add_property(idx, "myprop", 9.9)
00039         myfolder = myobject.add_folder(idx, "myfolder")
00040        
00041         # uncomment next lines to subscribe to changes on server side
00042         #sclt = SubHandler()
00043         #sub = server.create_subscription(100, sclt)
00044         #handle = sub.subscribe_data_change(myvar) #keep handle if you want to delete the particular subscription later
00045         
00046         ev = opcua.Event()
00047         counter = 0
00048         while True:
00049             counter += 1
00050             ev.message = "This is event nr: " + str(counter)
00051             ev.source_node = objects.get_id()
00052             ev.time = datetime.datetime.now()
00053             print("Sending event: ", ev)
00054             server.trigger_event(ev)
00055             myvar.set_value([counter, counter+10])
00056             time.sleep(1)
00057         # start ipython shell so users can test things
00058         #embed()
00059     finally:
00060         server.stop()
00061 


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:56