subscription.cpp
Go to the documentation of this file.
00001 /******************************************************************************
00002  *   Copyright (C) 2014-2014 Olivier Roulet-Dubonnet          *
00003  *   olivier.roulet@gmail.com          *
00004  *                      *
00005  *   This library is free software; you can redistribute it and/or modify   *
00006  *   it under the terms of the GNU Lesser General Public License as      *
00007  *   published by the Free Software Foundation; version 3 of the License.   *
00008  *                      *
00009  *   This library is distributed in the hope that it will be useful,      *
00010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of      *
00011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the      *
00012  *   GNU Lesser General Public License for more details.        *
00013  *                      *
00014  *   You should have received a copy of the GNU Lesser General Public License *
00015  *   along with this library; if not, write to the          *
00016  *   Free Software Foundation, Inc.,              *
00017  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.        *
00018  ******************************************************************************/
00019 
00020 
00021 #include <opc/ua/subscription.h>
00022 #include <opc/ua/protocol/string_utils.h>
00023 
00024 #include <boost/asio.hpp>
00025 #include <iostream>
00026 
00027 namespace OpcUa
00028 {
00029   Subscription::Subscription(Services::SharedPtr server, const CreateSubscriptionParameters& params, SubscriptionHandler& callback, bool debug)
00030     : Server(server), Client(callback), Debug(debug)
00031   {
00032     CreateSubscriptionRequest request;
00033     request.Parameters = params;
00034     Services::SharedPtr serverptr = Server;
00035     Data = Server->Subscriptions()->CreateSubscription(request, [this, serverptr](PublishResult i){ this->PublishCallback(serverptr, i); } );
00036     //After creating the subscription, it is expected to send at least one publish request
00037     Server->Subscriptions()->Publish(PublishRequest());
00038     Server->Subscriptions()->Publish(PublishRequest());
00039   }
00040 
00041   void Subscription::Delete()
00042   {
00043     std::vector<StatusCode> results = Server->Subscriptions()->DeleteSubscriptions(std::vector<uint32_t>{Data.SubscriptionId});
00044     for (auto res: results)
00045     {
00046       CheckStatusCode(res);
00047     }
00048   }
00049 
00050   void Subscription::PublishCallback(Services::SharedPtr server, const PublishResult result)
00051   {
00052 
00053     if (Debug){ std::cout << "Subscription | Suscription::PublishCallback called with " <<result.NotificationMessage.NotificationData.size() << " notifications " << std::endl; }
00054     for (const NotificationData& data: result.NotificationMessage.NotificationData )
00055     {
00056       if (data.Header.TypeId == ExpandedObjectId::DataChangeNotification)
00057       {
00058         if (Debug) { std::cout << "Subscription | Notification is of type DataChange\n"; }
00059         CallDataChangeCallback(data);
00060       }
00061       else if (data.Header.TypeId == ExpandedObjectId::EventNotificationList)
00062       {
00063         if (Debug) { std::cout << "Subscription | Notification is of type Event\n"; }
00064         CallEventCallback(data);
00065       }
00066       else if (data.Header.TypeId == ExpandedObjectId::StatusChangeNotification)
00067       {
00068         if (Debug) { std::cout << "Subscription | Notification is of type StatusChange\n"; }
00069         CallStatusChangeCallback(data);
00070       }
00071       else
00072       {
00073         std::cout << "Subscription | Error unknown notficiation type received: " << data.Header.TypeId <<std::endl;
00074       }
00075     }
00076     OpcUa::SubscriptionAcknowledgement ack;
00077     ack.SubscriptionId = GetId();
00078     ack.SequenceNumber = result.NotificationMessage.SequenceNumber;
00079     PublishRequest request;
00080     request.SubscriptionAcknowledgements.push_back(ack);
00081     server->Subscriptions()->Publish(request);
00082   }
00083 
00084   void Subscription::CallDataChangeCallback(const NotificationData& data)
00085   {
00086     for ( const MonitoredItems& item: data.DataChange.Notification)
00087     {
00088       std::unique_lock<std::mutex> lock(Mutex); //could used boost::shared_lock to improve perf
00089 
00090       AttValMap::iterator mapit = AttributeValueMap.find(item.ClientHandle);
00091       if ( mapit == AttributeValueMap.end() )
00092       {
00093         std::cout << "Subscription | Server Error got publishresult for an unknown  monitoreditem id : "<< item.ClientHandle << std::endl; 
00094       }
00095       else
00096       {
00097         AttributeId attr = mapit->second.Attribute;
00098         Node node = mapit->second.TargetNode;
00099         lock.unlock(); //unlock before calling client cades, you never know what they may do
00100         if (Debug) { std::cout << "Subscription | Debug: Calling DataChange user callback " << item.ClientHandle << " and node: " << mapit->second.TargetNode << std::endl; }
00101         Client.DataValueChange(mapit->second.MonitoredItemId, node, item.Value, attr);
00102         Client.DataChange(mapit->second.MonitoredItemId, node, item.Value.Value, attr);
00103       }
00104     }
00105   }
00106 
00107   void Subscription::CallStatusChangeCallback(const NotificationData& data)
00108   {
00109      Client.StatusChange(data.StatusChange.Status);
00110   }
00111 
00112   void Subscription::CallEventCallback(const NotificationData& data)
00113   {
00114     for ( EventFieldList ef :  data.Events.Events)
00115     {
00116       std::unique_lock<std::mutex> lock(Mutex); //could used boost::shared_lock to improve perf
00117 
00118       AttValMap::iterator mapit = AttributeValueMap.find(ef.ClientHandle);
00119       if ( mapit == AttributeValueMap.end() )
00120       {
00121         std::cout << "Subscription | Server Error got publishresult for an unknown  monitoreditem id : "<< ef.ClientHandle << std::endl; 
00122       }
00123       else
00124       {
00125         Event ev;
00126         uint32_t count = 0;
00127         if ( mapit->second.Filter.Event.SelectClauses.size() != ef.EventFields.size() )
00128         {
00129           throw std::runtime_error("Subscription | Error receive event format does not match requested filter");
00130         }
00131         for (SimpleAttributeOperand op : mapit->second.Filter.Event.SelectClauses )
00132         {
00133           //set the default fiedls of events into their event attributes
00134           if ( op.BrowsePath.size() == 1 )
00135           {
00136             if ( op.BrowsePath[0] == QualifiedName("EventId", 0) )
00137             {
00138               ev.EventId = ef.EventFields[count].As<ByteString>();
00139             }
00140             else if ( op.BrowsePath[0] == QualifiedName("EventType", 0) )
00141             {
00142               ev.EventType = ef.EventFields[count].As<NodeId>();
00143             }
00144             else if ( op.BrowsePath[0] == QualifiedName("SourceNode", 0) )
00145             {
00146               ev.SourceNode = ef.EventFields[count].As<NodeId>();
00147             }
00148             else if ( op.BrowsePath[0] == QualifiedName("SourceName", 0) )
00149             {
00150               ev.SourceName = ef.EventFields[count].As<std::string>();
00151             }
00152             else if ( op.BrowsePath[0] == QualifiedName("Message", 0) )
00153             {
00154               ev.Message = ef.EventFields[count].As<LocalizedText>();
00155             }
00156             else if ( op.BrowsePath[0] == QualifiedName("Severity", 0) )
00157             {
00158               ev.Severity = ef.EventFields[count].As<uint16_t>();
00159             }
00160             else if ( op.BrowsePath[0] == QualifiedName("LocalTime", 0) )
00161             {
00162               ev.LocalTime = ef.EventFields[count].As<DateTime>();
00163             }
00164             else if ( op.BrowsePath[0] == QualifiedName("ReceiveTime", 0) )
00165             {
00166               ev.ReceiveTime = ef.EventFields[count].As<DateTime>();
00167             }
00168             else if ( op.BrowsePath[0] == QualifiedName("Time", 0) )
00169             {
00170               ev.Time = ef.EventFields[count].As<DateTime>();
00171             }
00172           }
00173           //Add anyway all fields as value
00174           ev.SetValue(op.BrowsePath, ef.EventFields[count]);
00175           ++count;
00176         }
00177         lock.unlock(); 
00178         if (Debug) { std::cout << "Subscription | Debug: Calling client event callback\n"; }
00179         Client.Event(mapit->second.MonitoredItemId, ev);
00180         if (Debug) { std::cout << "Subscription | Debug: callback call finished\n"; }
00181       }
00182     }
00183   }
00184 
00185   RepublishResponse Subscription::Republish(uint32_t sequenceNumber)
00186   {
00187     RepublishParameters params;
00188     params.SubscriptionId = Data.SubscriptionId;
00189     params.RetransmitSequenceNumber = sequenceNumber;
00190     RepublishResponse response = Server->Subscriptions()->Republish(params);
00191     return response;
00192   }
00193 
00194   uint32_t Subscription::SubscribeDataChange(const Node& node, AttributeId attr)
00195   {
00196     ReadValueId avid;
00197     avid.NodeId = node.GetId();
00198     avid.AttributeId = attr;
00199     //avid.IndexRange //We leave it null, then the entire array is returned
00200     std::vector<uint32_t> results = SubscribeDataChange(std::vector<ReadValueId>({avid}));
00201     if (results.size() != 1) { throw std::runtime_error("Subscription | Server error, SubscribeDataChange should have returned exactly one result"); }
00202     return results.front();
00203   }
00204 
00205   std::vector<MonitoredItemCreateResult> Subscription::Subscribe(std::vector<MonitoredItemCreateRequest> request)
00206   {
00207     std::unique_lock<std::mutex> lock(Mutex); 
00208 
00209     MonitoredItemsParameters itemsParams;
00210     itemsParams.SubscriptionId = Data.SubscriptionId;
00211     itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
00212     for (auto req : request)
00213     {
00214       itemsParams.ItemsToCreate.push_back(req);
00215     }
00216 
00217     return  Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00218   }
00219  
00220   std::vector<uint32_t> Subscription::SubscribeDataChange(const std::vector<ReadValueId>& attributes)
00221   {
00222     std::unique_lock<std::mutex> lock(Mutex); 
00223 
00224     MonitoredItemsParameters itemsParams;
00225     itemsParams.SubscriptionId = Data.SubscriptionId;
00226     itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
00227 
00228     for (ReadValueId attr : attributes)
00229     {
00230       MonitoredItemCreateRequest req;
00231       req.ItemToMonitor = attr;
00232       req.MonitoringMode = MonitoringMode::Reporting;
00233       MonitoringParameters params;
00234       params.SamplingInterval = Data.RevisedPublishingInterval;
00235       params.QueueSize = 1;
00236       params.DiscardOldest = true;
00237       params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
00238       req.RequestedParameters = params;
00239       itemsParams.ItemsToCreate.push_back(req);
00240     }
00241 
00242     std::vector<MonitoredItemCreateResult> results =  Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00243 
00244     if ( results.size() != attributes.size() ) 
00245     {
00246       throw(std::runtime_error("Subscription | Error server did not send answer for all monitoreditem requests"));
00247     }
00248 
00249     std::vector<uint32_t> monitoredItemsIds;
00250     unsigned int i = 0;
00251     for (const auto& res : results)
00252     {
00253       CheckStatusCode(res.Status);
00254       if (Debug ) { std::cout << "Subscription | storing monitoreditem with handle " << itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle << " and id " << res.MonitoredItemId << std::endl;  }
00255       MonitoredItemData mdata; 
00256       mdata.MonitoredItemId = res.MonitoredItemId;
00257       mdata.Attribute =  attributes[i].AttributeId;
00258       mdata.TargetNode =  Node(Server, attributes[i].NodeId);
00259       AttributeValueMap[itemsParams.ItemsToCreate[i].RequestedParameters.ClientHandle] = mdata;
00260       monitoredItemsIds.push_back(res.MonitoredItemId);
00261       ++i;
00262     }
00263     return monitoredItemsIds;
00264   }
00265 
00266   void Subscription::setUsrPtr(uint32_t handle,UserData *usr)
00267   {
00268       AttributeValueMap[handle].usrVar = usr;
00269   }
00270 
00271   UserData * Subscription::getUsrPtr(uint32_t handle)
00272   {
00273       return AttributeValueMap[handle].usrVar;
00274   }
00275 
00276   void Subscription::UnSubscribe(uint32_t handle)
00277   {
00278     return UnSubscribe(std::vector<uint32_t>(1, handle));
00279   }
00280 
00281   void Subscription::UnSubscribe(std::vector<uint32_t> handles) 
00282   {
00283     std::unique_lock<std::mutex> lock(Mutex); 
00284 
00285     DeleteMonitoredItemsParameters params;
00286     params.SubscriptionId = Data.SubscriptionId;
00287     std::vector<uint32_t> mids;
00288     for (auto id : handles)
00289     {
00290       if (Debug) std::cout << "Subscription | Sending unsubscribe for monitoreditemsid: " << id << std::endl;
00291       mids.push_back(uint32_t(id));
00292       //Now trying to remove monitoreditem from our internal cache
00293       for ( auto pair : AttributeValueMap )
00294       {
00295         if (pair.second.MonitoredItemId == id)
00296         {
00297           AttributeValueMap.erase(pair.first);
00298           break; //we modified our iterating object, so quit!!
00299         }
00300       }
00301     }
00302     params.MonitoredItemIds = mids;
00303     auto results = Server->Subscriptions()-> DeleteMonitoredItems(params);
00304     for (auto res : results)
00305     {
00306       CheckStatusCode(res);
00307     }
00308   }
00309 
00310   uint32_t Subscription::SubscribeEvents()
00311   {
00312     return SubscribeEvents(Node(Server, ObjectId::Server), Node(Server, ObjectId::BaseEventType));
00313   }
00314 
00315   uint32_t Subscription::SubscribeEvents(const Node& node, const Node& eventtype)  
00316   {
00317     EventFilter filter;
00318     if (Debug) std::cout << "Subscription | Subscribing events with filter for properties:" << std::endl;
00319     for ( Node& child: eventtype.GetProperties() )
00320     {
00321       if (Debug) std::cout << "      property: "<< child.GetBrowseName() << std::endl;
00322       SimpleAttributeOperand op;
00323       op.TypeId = eventtype.GetId();
00324       op.Attribute = AttributeId::Value;
00325       op.BrowsePath = std::vector<QualifiedName>({child.GetBrowseName()});
00326       filter.SelectClauses.push_back(op);
00327     }
00328     return SubscribeEvents(node, filter);
00329   }
00330 
00331   uint32_t Subscription::SubscribeEvents(const Node& node, const EventFilter& eventfilter)
00332   {
00333     std::unique_lock<std::mutex> lock(Mutex); 
00334 
00335     MonitoredItemsParameters itemsParams;
00336     itemsParams.SubscriptionId = Data.SubscriptionId;
00337     itemsParams.TimestampsToReturn = TimestampsToReturn(2); // Don't know for better
00338 
00339     ReadValueId avid;
00340     avid.NodeId = node.GetId();
00341     avid.AttributeId = AttributeId::EventNotifier;
00342 
00343     MonitoredItemCreateRequest req;
00344     req.ItemToMonitor = avid;
00345     req.MonitoringMode = MonitoringMode::Reporting;
00346     MonitoringParameters params;
00347     params.SamplingInterval = Data.RevisedPublishingInterval;
00348     params.QueueSize = std::numeric_limits<uint32_t>::max();
00349     params.DiscardOldest = true;
00350     params.ClientHandle = (uint32_t)++LastMonitoredItemHandle;
00351 
00352     MonitoringFilter filter(eventfilter);
00353     params.Filter = filter;
00354     req.RequestedParameters = params;
00355     itemsParams.ItemsToCreate.push_back(req);
00356 
00357     std::vector<MonitoredItemCreateResult> results =  Server->Subscriptions()->CreateMonitoredItems(itemsParams);
00358     if ( results.size()  != 1 )
00359     {
00360       throw(std::runtime_error("Subscription | Protocol Error CreateMonitoredItems should return one result"));
00361     }
00362 
00363     MonitoredItemData mdata;
00364     mdata.TargetNode = Node(Server, avid.NodeId);
00365     mdata.Attribute = avid.AttributeId;
00366     mdata.MonitoredItemId = results[0].MonitoredItemId;
00367     mdata.Filter = results[0].FilterResult;
00368     AttributeValueMap[params.ClientHandle] = mdata;
00369 
00370 
00371     MonitoredItemCreateResult res = results[0];
00372     CheckStatusCode(res.Status);
00373     SimpleAttributeOperandMap[res.MonitoredItemId] = eventfilter; //Not used
00374     return res.MonitoredItemId;
00375   }
00376 
00377 }


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:57