internal_subscription.cpp
Go to the documentation of this file.
00001 #include "internal_subscription.h"
00002 
00003 #include <boost/thread/locks.hpp>
00004 
00005 namespace OpcUa
00006 {
00007   namespace Internal
00008   {
00009 
00010     InternalSubscription::InternalSubscription(SubscriptionServiceInternal& service, const SubscriptionData& data, const NodeId& SessionAuthenticationToken, std::function<void (PublishResult)> callback, bool debug)
00011       : Service(service)
00012       , AddressSpace(Service.GetAddressSpace())
00013       , Data(data)
00014       , CurrentSession(SessionAuthenticationToken)
00015       , Callback(callback)
00016       , io(service.GetIOService())
00017       , Timer(io, boost::posix_time::milliseconds(data.RevisedPublishingInterval))
00018       , LifeTimeCount(data.RevisedLifetimeCount)
00019       , Debug(debug)
00020     {
00021     }
00022 
00023     void InternalSubscription::Start()
00024     {
00025       std::shared_ptr<InternalSubscription> self = shared_from_this();
00026       Timer.async_wait([self](const boost::system::error_code& error){ self->PublishResults(error); });
00027     }
00028 
00029     InternalSubscription::~InternalSubscription()
00030     {
00031       //Stop(); 
00032     }
00033 
00034     void InternalSubscription::Stop()
00035     {
00036       DeleteAllMonitoredItems();
00037       Timer.cancel();
00038     }
00039 
00040     void InternalSubscription::DeleteAllMonitoredItems()
00041     {
00042       if (Debug) std::cout << "InternalSubscription | Deleting all monitoreditems" << std::endl; 
00043       boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00044 
00045       std::vector<uint32_t> handles;
00046       for (auto pair : MonitoredDataChanges)
00047       {
00048         handles.push_back(pair.first);
00049       }
00050       lock.unlock();
00051       DeleteMonitoredItemsIds(handles);
00052     }
00053 
00054     bool InternalSubscription::HasExpired()
00055     {
00056       bool expired = KeepAliveCount > LifeTimeCount ;
00057       if ( expired )
00058       {
00059         if (Debug) { std::cout << "InternalSubscription | Subscription has expired " << KeepAliveCount << "  " << LifeTimeCount << std::endl; }
00060       }
00061       return expired;
00062     }
00063 
00064     void InternalSubscription::PublishResults(const boost::system::error_code& error)
00065     {
00066       if ( error )
00067       {
00068         if (Debug) std::cout << "InternalSubscription | Stopping subscription timer" << std::endl;
00069         return; 
00070       }
00071       if ( HasExpired() )
00072       {
00073         if (Debug) { std::cout << "InternalSubscription | Subscription has expired" << std::endl; }
00074         return; 
00075       }
00076 
00077       if ( HasPublishResult() && Service.PopPublishRequest(CurrentSession) ) //Check we received a publishrequest before sening respomse
00078       {
00079 
00080         std::vector<PublishResult> results = PopPublishResult();
00081         if (results.size() > 0 )
00082         {
00083           if (Debug) { std::cout << "InternalSubscription | Subscription has " << results.size() << " results, calling callback" << std::endl; }
00084           if ( Callback )
00085           {
00086             Callback(results[0]);
00087           }
00088           else
00089           {
00090             if (Debug) std::cout << "InternalSubcsription | No callback defined for this subscription" << std::endl;
00091           }
00092          }
00093       }
00094       TimerStopped = false;
00095       Timer.expires_at(Timer.expires_at() + boost::posix_time::milliseconds(Data.RevisedPublishingInterval));
00096       std::shared_ptr<InternalSubscription> self = shared_from_this();
00097       Timer.async_wait([self](const boost::system::error_code& error){ self->PublishResults(error); });
00098     }
00099 
00100 
00101     bool InternalSubscription::HasPublishResult()
00102     {
00103       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00104       
00105       if ( Startup || ! TriggeredDataChangeEvents.empty() || ! TriggeredEvents.empty() ) 
00106       {
00107         return true;
00108       }
00109       if ( KeepAliveCount > Data.RevisedMaxKeepAliveCount ) //we need to send keepalive notification
00110       {
00111         if (Debug) std::cout << "InternalSubscription | KeepAliveCount " << KeepAliveCount << " is > than MaxKeepAliveCount " <<  Data.RevisedMaxKeepAliveCount << " sending publish event" << std::endl;
00112         return true;
00113       }
00114       ++KeepAliveCount;
00115       return false;
00116 
00117     }
00118 
00119     std::vector<PublishResult> InternalSubscription::PopPublishResult()
00120     {
00121       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00122 
00123       //std::cout << "PopPublishresult for subscription: " << Data.SubscriptionId << " with " << TriggeredDataChangeEvents.size() << " triggered items in queue" << std::endl;
00124       PublishResult result;
00125       result.SubscriptionId = Data.SubscriptionId;
00126       result.NotificationMessage.PublishTime = DateTime::Current();
00127 
00128       if ( ! TriggeredDataChangeEvents.empty() )
00129       {
00130         NotificationData data = GetNotificationData();
00131         result.NotificationMessage.NotificationData.push_back(data);
00132         result.Results.push_back(StatusCode::Good);
00133       }
00134           
00135       if ( ! TriggeredEvents.empty() )
00136       {
00137         if (Debug) { std::cout << "InternalSubcsription | Subscription " << Data.SubscriptionId << " has " << TriggeredEvents.size() << " events to send to client" << std::endl; }
00138         EventNotificationList notif;
00139         for ( TriggeredEvent ev: TriggeredEvents )
00140         {
00141           notif.Events.push_back(ev.Data);
00142         }
00143         TriggeredEvents.clear();
00144         NotificationData data(notif);
00145         result.NotificationMessage.NotificationData.push_back(data);
00146         result.Results.push_back(StatusCode::Good);
00147       }
00148 
00149 
00150       // FIXME: also add statuschange notification since they can be send in same result
00151       
00152       KeepAliveCount = 0;
00153       Startup = false;
00154 
00155       result.NotificationMessage.SequenceNumber = NotificationSequence;
00156       ++NotificationSequence;
00157       result.MoreNotifications = false;
00158       for (const PublishResult& res: NotAcknowledgedResults)
00159       {
00160         result.AvailableSequenceNumbers.push_back(res.NotificationMessage.SequenceNumber);
00161       }
00162       NotAcknowledgedResults.push_back(result);
00163       if (Debug) { std::cout << "InternalSubcsription | Sending Notification with " << result.NotificationMessage.NotificationData.size() << " notifications"  << std::endl; }
00164       std::vector<PublishResult> resultlist;
00165       resultlist.push_back(result);
00166 
00167       return resultlist;
00168     };
00169 
00170     RepublishResponse InternalSubscription::Republish(const RepublishParameters& params)
00171     {
00172       if (Debug) std::cout << "SubscriptionService| RepublishRequest for sequence: " << params.RetransmitSequenceNumber << std::endl;
00173       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00174 
00175       RepublishResponse response;
00176       for (const PublishResult& res: NotAcknowledgedResults)
00177       {
00178         if (res.NotificationMessage.SequenceNumber == params.RetransmitSequenceNumber)
00179         {
00180           response.NotificationMessage = res.NotificationMessage;
00181           return response;
00182         }
00183       }
00184       response.Header.ServiceResult = StatusCode::BadMessageNotAvailable;
00185       return response;
00186     }
00187 
00188     NotificationData InternalSubscription::GetNotificationData()
00189     {
00190       DataChangeNotification notification;
00191       for ( const TriggeredDataChange& event: TriggeredDataChangeEvents)
00192       {
00193         notification.Notification.push_back(event.Data);
00194       }
00195       TriggeredDataChangeEvents.clear();
00196       NotificationData data(notification);
00197       return data;
00198     }
00199 
00200     void InternalSubscription::NewAcknowlegment(const SubscriptionAcknowledgement& ack)
00201     {
00202       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00203 
00204       NotAcknowledgedResults.remove_if([&](PublishResult res){ return ack.SequenceNumber == res.NotificationMessage.SequenceNumber; });
00205     }
00206     
00207 
00208     MonitoredItemCreateResult InternalSubscription::CreateMonitoredItem(const MonitoredItemCreateRequest& request)
00209     {
00210       if (Debug) std::cout << "SubscriptionService| Creating monitored item." << std::endl;
00211       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00212 
00213       MonitoredItemCreateResult result;
00214       uint32_t callbackHandle = 0;
00215       result.MonitoredItemId = ++LastMonitoredItemId;
00216       if (request.ItemToMonitor.AttributeId == AttributeId::EventNotifier )
00217       {
00218         if (Debug) std::cout << "SubscriptionService| Subscribed o event notifier " << std::endl;
00219         //client want to subscribe to events
00220         //FIXME: check attribute EVENT notifier is set for the node
00221         MonitoredEvents[request.ItemToMonitor.NodeId] = result.MonitoredItemId;
00222       }
00223       else
00224       {
00225         if (Debug) std::cout << "SubscriptionService| Subscribing to data chanes in the address space." << std::endl;
00226         uint32_t id = result.MonitoredItemId;
00227         callbackHandle = AddressSpace.AddDataChangeCallback(request.ItemToMonitor.NodeId, request.ItemToMonitor.AttributeId, [this, id] (const OpcUa::NodeId& nodeId, OpcUa::AttributeId attr, const DataValue& value)
00228           {
00229             this->DataChangeCallback(id, value);
00230           });
00231 
00232         if (callbackHandle == 0)
00233         {
00234           if (Debug) std::cout << "SubscriptionService| ERROR: address returned zero handle." << std::endl;
00235           --LastMonitoredItemId; //revert increment 
00236           result.Status = OpcUa::StatusCode::BadNodeAttributesInvalid;
00237           return result;
00238         }
00239       }
00240       result.Status = OpcUa::StatusCode::Good;
00241       result.RevisedSamplingInterval = Data.RevisedPublishingInterval; //Force our own rate
00242       result.RevisedQueueSize = request.RequestedParameters.QueueSize; // We should check that value, maybe set to a default...
00243       result.FilterResult = request.RequestedParameters.Filter; //We can omit that one if we do not change anything in filter
00244       MonitoredDataChange mdata;
00245       mdata.Parameters = result;
00246       mdata.Mode = request.MonitoringMode;
00247       mdata.ClientHandle = request.RequestedParameters.ClientHandle;
00248       mdata.CallbackHandle = callbackHandle;
00249       mdata.MonitoredItemId = result.MonitoredItemId;
00250       MonitoredDataChanges[result.MonitoredItemId] = mdata;
00251       if (Debug) std::cout << "Created MonitoredItem with id: " << result.MonitoredItemId << " and client handle " << mdata.ClientHandle << std::endl;
00252       //Forcing event, 
00253       if (request.ItemToMonitor.AttributeId != AttributeId::EventNotifier )
00254       {
00255         TriggerDataChangeEvent(mdata, request.ItemToMonitor);
00256       }
00257 
00258       return result;
00259     }
00260 
00261     void InternalSubscription::TriggerDataChangeEvent(MonitoredDataChange monitoreditems, ReadValueId attrval)
00262     {
00263       if (Debug) { std::cout << "InternalSubcsription | Manual Trigger of DataChangeEvent for sub: " << Data.SubscriptionId << " and clienthandle: " << monitoreditems.ClientHandle << std::endl; }
00264       ReadParameters params;
00265       params.AttributesToRead.push_back(attrval);
00266       std::vector<DataValue> vals = AddressSpace.Read(params);
00267       
00268       TriggeredDataChange event;
00269       event.MonitoredItemId = monitoreditems.MonitoredItemId;
00270       event.Data.ClientHandle = monitoreditems.ClientHandle; 
00271       event.Data.Value = vals[0];
00272       TriggeredDataChangeEvents.push_back(event);
00273     }
00274 
00275     std::vector<StatusCode> InternalSubscription::DeleteMonitoredItemsIds(const std::vector<uint32_t>& monitoreditemsids)
00276     {
00277       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00278 
00279       std::vector<StatusCode> results;
00280       for (const uint32_t& handle: monitoreditemsids)
00281       {
00282         if (Debug) std::cout << "InternalSubcsription | Deleting Monitoreditemsid: " << handle << std::endl;
00283 
00284         if ( DeleteMonitoredEvent(handle) )
00285         {
00286           results.push_back(StatusCode::Good);
00287           continue;
00288         }
00289 
00290         if ( DeleteMonitoredDataChange(handle) )
00291         {
00292           results.push_back(StatusCode::Good);
00293           continue;
00294         }
00295 
00296         results.push_back(StatusCode::BadMonitoredItemIdInvalid);
00297 
00298       }
00299       return results;
00300     }
00301 
00302     bool InternalSubscription::DeleteMonitoredDataChange(uint32_t handle)
00303     {
00304         MonitoredDataChangeMap::iterator it = MonitoredDataChanges.find(handle);
00305         if ( it == MonitoredDataChanges.end() )
00306         {
00307           return false;
00308         }
00309         else
00310         {
00311           if (it->second.CallbackHandle != 0){ //if 0 this monitoreditem did not use callbacks
00312             AddressSpace.DeleteDataChangeCallback(it->second.CallbackHandle);
00313           }
00314           MonitoredDataChanges.erase(handle);
00315           //We remove you our monitoreditem, now empty events which are already triggered
00316           for(auto ev = TriggeredDataChangeEvents.begin(); ev != TriggeredDataChangeEvents.end();)
00317           {
00318             if(ev->MonitoredItemId == handle)
00319             {
00320               if (Debug) std::cout << "InternalSubscription | Remove triggeredEvent for monitoreditemid " << handle << std::endl;
00321               ev = TriggeredDataChangeEvents.erase(ev);
00322             }
00323             else
00324             {
00325               ++ev;
00326             }
00327           }
00328           return true;
00329         }
00330     }
00331 
00332     bool InternalSubscription::DeleteMonitoredEvent(uint32_t handle)
00333     {
00334        for (auto pair : MonitoredEvents)
00335         {
00336           if ( pair.second == handle )
00337           {
00338             MonitoredEvents.erase(pair.first);
00339             //We remove you our monitoreditem, now empty events which are already triggered
00340             for(auto ev = TriggeredEvents.begin(); ev != TriggeredEvents.end();)
00341             {
00342               if(ev->MonitoredItemId == handle)
00343               {
00344                 if (Debug) std::cout << "InternalSubscription | Remove triggeredEvent for monitoreditemid " << handle << std::endl;
00345                 ev = TriggeredEvents.erase(ev);
00346               }
00347               else
00348               {
00349                 ++ev;
00350               }
00351             }
00352             return true;
00353           }
00354         }
00355       return false;
00356     }
00357 
00358     void InternalSubscription::DataChangeCallback(const uint32_t& m_id, const DataValue& value)
00359     {
00360       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00361 
00362       TriggeredDataChange event;
00363       MonitoredDataChangeMap::iterator it_monitoreditem = MonitoredDataChanges.find(m_id);
00364       if ( it_monitoreditem == MonitoredDataChanges.end()) 
00365       {
00366         std::cout << "InternalSubcsription | DataChangeCallback called for unknown item" << std::endl;
00367         return ;
00368       }
00369 
00370       event.MonitoredItemId = it_monitoreditem->first;
00371       event.Data.ClientHandle = it_monitoreditem->second.ClientHandle; 
00372       event.Data.Value = value;
00373       if (Debug) { std::cout << "InternalSubcsription | Enqueued DataChange triggered item for sub: " << Data.SubscriptionId << " and clienthandle: " << event.Data.ClientHandle << std::endl; }
00374       TriggeredDataChangeEvents.push_back(event);
00375     }
00376 
00377     void InternalSubscription::TriggerEvent(NodeId node, Event event)
00378     {
00379       boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00380 
00381       MonitoredEventsMap::iterator it = MonitoredEvents.find(node);
00382       if ( it == MonitoredEvents.end() )
00383       {
00384         if (Debug) std::cout << "InternalSubcsription | Subscription: " << Data.SubscriptionId << " has no subcsription for this event" << std::endl;
00385         return;
00386       }
00387       lock.unlock();//Enqueue vill need to set a unique lock
00388       EnqueueEvent(it->second, event);
00389     }
00390 
00391     bool InternalSubscription::EnqueueEvent(uint32_t monitoreditemid, const Event& event)
00392     {
00393       if (Debug) { std::cout << "InternalSubcsription | Enqueing event to be send" << std::endl; }
00394       boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00395 
00396       if (Debug) { std::cout << "enqueueing event: " << event << std::endl;}
00397 
00398       //Find monitoredItem 
00399       std::map<uint32_t, MonitoredDataChange>::iterator mii_it =  MonitoredDataChanges.find( monitoreditemid );
00400       if  (mii_it == MonitoredDataChanges.end() ) 
00401       {
00402         if (Debug) std::cout << "InternalSubcsription | monitoreditem " << monitoreditemid << " is already deleted" << std::endl; 
00403         return false;
00404       }
00405           
00406       //Check filter against event data and create EventFieldList to send
00407       //FIXME: Here we should also check event agains WhereClause of filter
00408       EventFieldList fieldlist;
00409       fieldlist.ClientHandle = mii_it->second.ClientHandle; 
00410       fieldlist.EventFields = GetEventFields(mii_it->second.Parameters.FilterResult.Event, event);
00411       TriggeredEvent ev;
00412       ev.Data = fieldlist;
00413       ev.MonitoredItemId = monitoreditemid;
00414       TriggeredEvents.push_back(ev);
00415       return true;
00416     }
00417 
00418     std::vector<Variant> InternalSubscription::GetEventFields(const EventFilter& filter, const Event& event)
00419     {
00420       //Go through filter and add value og matches as in spec
00421       std::vector<Variant> fields;
00422       if(Debug) std::cout << "InternalSubscription | InternalGetEventField " << filter.SelectClauses.size() << std::endl;
00423       for (SimpleAttributeOperand sattr : filter.SelectClauses)
00424       {
00425         if(Debug) std::cout << "InternalSubscription | BrowsePath size " << sattr.BrowsePath.size() << std::endl;
00426         if ( sattr.BrowsePath.size() == 0 )
00427         {
00428           fields.push_back(event.GetValue(sattr.Attribute));
00429         }
00430         else
00431         {
00432           if(Debug) std::cout << "InternalSubscription | sending value for : " << sattr.BrowsePath[0] << std::endl;
00433           if ( sattr.BrowsePath[0] == QualifiedName("EventId", 0) )
00434           {
00435             fields.push_back(event.EventId);
00436           }
00437           else if ( sattr.BrowsePath[0] == QualifiedName("EventType", 0) )
00438           {
00439             fields.push_back(event.EventType);
00440           }
00441           else if ( sattr.BrowsePath[0] == QualifiedName("SourceNode", 0) )
00442           {
00443             fields.push_back(event.SourceNode);
00444           }
00445           else if ( sattr.BrowsePath[0] == QualifiedName("SourceName", 0) )
00446           {
00447             fields.push_back(event.SourceName);
00448           }
00449           else if ( sattr.BrowsePath[0] == QualifiedName("Message", 0) )
00450           {
00451             if (Debug) std::cout << "msg is: " << event.Message << std::endl;
00452             fields.push_back(event.Message);
00453           }
00454           else if ( sattr.BrowsePath[0] == QualifiedName("Severity", 0) )
00455           {
00456             fields.push_back(event.Severity);
00457           }
00458           else if ( sattr.BrowsePath[0] == QualifiedName("LocalTime", 0) )
00459           {
00460             fields.push_back(event.LocalTime);
00461           }
00462           else if ( sattr.BrowsePath[0] == QualifiedName("ReceiveTime", 0) )
00463           {
00464             fields.push_back(event.ReceiveTime);
00465           }
00466           else if ( sattr.BrowsePath[0] == QualifiedName("Time", 0) )
00467           {
00468             fields.push_back(event.Time);
00469           }
00470           else
00471           {
00472             fields.push_back(event.GetValue(sattr.BrowsePath));
00473           }
00474         }
00475       }
00476       return fields;
00477     }
00478 
00479 
00480   }
00481 }
00482 
00483 


ros_opcua_impl_freeopcua
Author(s): Denis Štogl
autogenerated on Sat Jun 8 2019 18:24:56