Go to the documentation of this file.00001 #include "internal_subscription.h"
00002
00003 #include <boost/thread/locks.hpp>
00004
00005 namespace OpcUa
00006 {
00007 namespace Internal
00008 {
00009
00010 InternalSubscription::InternalSubscription(SubscriptionServiceInternal& service, const SubscriptionData& data, const NodeId& SessionAuthenticationToken, std::function<void (PublishResult)> callback, bool debug)
00011 : Service(service)
00012 , AddressSpace(Service.GetAddressSpace())
00013 , Data(data)
00014 , CurrentSession(SessionAuthenticationToken)
00015 , Callback(callback)
00016 , io(service.GetIOService())
00017 , Timer(io, boost::posix_time::milliseconds(data.RevisedPublishingInterval))
00018 , LifeTimeCount(data.RevisedLifetimeCount)
00019 , Debug(debug)
00020 {
00021 }
00022
00023 void InternalSubscription::Start()
00024 {
00025 std::shared_ptr<InternalSubscription> self = shared_from_this();
00026 Timer.async_wait([self](const boost::system::error_code& error){ self->PublishResults(error); });
00027 }
00028
00029 InternalSubscription::~InternalSubscription()
00030 {
00031
00032 }
00033
00034 void InternalSubscription::Stop()
00035 {
00036 DeleteAllMonitoredItems();
00037 Timer.cancel();
00038 }
00039
00040 void InternalSubscription::DeleteAllMonitoredItems()
00041 {
00042 if (Debug) std::cout << "InternalSubscription | Deleting all monitoreditems" << std::endl;
00043 boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00044
00045 std::vector<uint32_t> handles;
00046 for (auto pair : MonitoredDataChanges)
00047 {
00048 handles.push_back(pair.first);
00049 }
00050 lock.unlock();
00051 DeleteMonitoredItemsIds(handles);
00052 }
00053
00054 bool InternalSubscription::HasExpired()
00055 {
00056 bool expired = KeepAliveCount > LifeTimeCount ;
00057 if ( expired )
00058 {
00059 if (Debug) { std::cout << "InternalSubscription | Subscription has expired " << KeepAliveCount << " " << LifeTimeCount << std::endl; }
00060 }
00061 return expired;
00062 }
00063
00064 void InternalSubscription::PublishResults(const boost::system::error_code& error)
00065 {
00066 if ( error )
00067 {
00068 if (Debug) std::cout << "InternalSubscription | Stopping subscription timer" << std::endl;
00069 return;
00070 }
00071 if ( HasExpired() )
00072 {
00073 if (Debug) { std::cout << "InternalSubscription | Subscription has expired" << std::endl; }
00074 return;
00075 }
00076
00077 if ( HasPublishResult() && Service.PopPublishRequest(CurrentSession) )
00078 {
00079
00080 std::vector<PublishResult> results = PopPublishResult();
00081 if (results.size() > 0 )
00082 {
00083 if (Debug) { std::cout << "InternalSubscription | Subscription has " << results.size() << " results, calling callback" << std::endl; }
00084 if ( Callback )
00085 {
00086 Callback(results[0]);
00087 }
00088 else
00089 {
00090 if (Debug) std::cout << "InternalSubcsription | No callback defined for this subscription" << std::endl;
00091 }
00092 }
00093 }
00094 TimerStopped = false;
00095 Timer.expires_at(Timer.expires_at() + boost::posix_time::milliseconds(Data.RevisedPublishingInterval));
00096 std::shared_ptr<InternalSubscription> self = shared_from_this();
00097 Timer.async_wait([self](const boost::system::error_code& error){ self->PublishResults(error); });
00098 }
00099
00100
00101 bool InternalSubscription::HasPublishResult()
00102 {
00103 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00104
00105 if ( Startup || ! TriggeredDataChangeEvents.empty() || ! TriggeredEvents.empty() )
00106 {
00107 return true;
00108 }
00109 if ( KeepAliveCount > Data.RevisedMaxKeepAliveCount )
00110 {
00111 if (Debug) std::cout << "InternalSubscription | KeepAliveCount " << KeepAliveCount << " is > than MaxKeepAliveCount " << Data.RevisedMaxKeepAliveCount << " sending publish event" << std::endl;
00112 return true;
00113 }
00114 ++KeepAliveCount;
00115 return false;
00116
00117 }
00118
00119 std::vector<PublishResult> InternalSubscription::PopPublishResult()
00120 {
00121 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00122
00123
00124 PublishResult result;
00125 result.SubscriptionId = Data.SubscriptionId;
00126 result.NotificationMessage.PublishTime = DateTime::Current();
00127
00128 if ( ! TriggeredDataChangeEvents.empty() )
00129 {
00130 NotificationData data = GetNotificationData();
00131 result.NotificationMessage.NotificationData.push_back(data);
00132 result.Results.push_back(StatusCode::Good);
00133 }
00134
00135 if ( ! TriggeredEvents.empty() )
00136 {
00137 if (Debug) { std::cout << "InternalSubcsription | Subscription " << Data.SubscriptionId << " has " << TriggeredEvents.size() << " events to send to client" << std::endl; }
00138 EventNotificationList notif;
00139 for ( TriggeredEvent ev: TriggeredEvents )
00140 {
00141 notif.Events.push_back(ev.Data);
00142 }
00143 TriggeredEvents.clear();
00144 NotificationData data(notif);
00145 result.NotificationMessage.NotificationData.push_back(data);
00146 result.Results.push_back(StatusCode::Good);
00147 }
00148
00149
00150
00151
00152 KeepAliveCount = 0;
00153 Startup = false;
00154
00155 result.NotificationMessage.SequenceNumber = NotificationSequence;
00156 ++NotificationSequence;
00157 result.MoreNotifications = false;
00158 for (const PublishResult& res: NotAcknowledgedResults)
00159 {
00160 result.AvailableSequenceNumbers.push_back(res.NotificationMessage.SequenceNumber);
00161 }
00162 NotAcknowledgedResults.push_back(result);
00163 if (Debug) { std::cout << "InternalSubcsription | Sending Notification with " << result.NotificationMessage.NotificationData.size() << " notifications" << std::endl; }
00164 std::vector<PublishResult> resultlist;
00165 resultlist.push_back(result);
00166
00167 return resultlist;
00168 };
00169
00170 RepublishResponse InternalSubscription::Republish(const RepublishParameters& params)
00171 {
00172 if (Debug) std::cout << "SubscriptionService| RepublishRequest for sequence: " << params.RetransmitSequenceNumber << std::endl;
00173 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00174
00175 RepublishResponse response;
00176 for (const PublishResult& res: NotAcknowledgedResults)
00177 {
00178 if (res.NotificationMessage.SequenceNumber == params.RetransmitSequenceNumber)
00179 {
00180 response.NotificationMessage = res.NotificationMessage;
00181 return response;
00182 }
00183 }
00184 response.Header.ServiceResult = StatusCode::BadMessageNotAvailable;
00185 return response;
00186 }
00187
00188 NotificationData InternalSubscription::GetNotificationData()
00189 {
00190 DataChangeNotification notification;
00191 for ( const TriggeredDataChange& event: TriggeredDataChangeEvents)
00192 {
00193 notification.Notification.push_back(event.Data);
00194 }
00195 TriggeredDataChangeEvents.clear();
00196 NotificationData data(notification);
00197 return data;
00198 }
00199
00200 void InternalSubscription::NewAcknowlegment(const SubscriptionAcknowledgement& ack)
00201 {
00202 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00203
00204 NotAcknowledgedResults.remove_if([&](PublishResult res){ return ack.SequenceNumber == res.NotificationMessage.SequenceNumber; });
00205 }
00206
00207
00208 MonitoredItemCreateResult InternalSubscription::CreateMonitoredItem(const MonitoredItemCreateRequest& request)
00209 {
00210 if (Debug) std::cout << "SubscriptionService| Creating monitored item." << std::endl;
00211 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00212
00213 MonitoredItemCreateResult result;
00214 uint32_t callbackHandle = 0;
00215 result.MonitoredItemId = ++LastMonitoredItemId;
00216 if (request.ItemToMonitor.AttributeId == AttributeId::EventNotifier )
00217 {
00218 if (Debug) std::cout << "SubscriptionService| Subscribed o event notifier " << std::endl;
00219
00220
00221 MonitoredEvents[request.ItemToMonitor.NodeId] = result.MonitoredItemId;
00222 }
00223 else
00224 {
00225 if (Debug) std::cout << "SubscriptionService| Subscribing to data chanes in the address space." << std::endl;
00226 uint32_t id = result.MonitoredItemId;
00227 callbackHandle = AddressSpace.AddDataChangeCallback(request.ItemToMonitor.NodeId, request.ItemToMonitor.AttributeId, [this, id] (const OpcUa::NodeId& nodeId, OpcUa::AttributeId attr, const DataValue& value)
00228 {
00229 this->DataChangeCallback(id, value);
00230 });
00231
00232 if (callbackHandle == 0)
00233 {
00234 if (Debug) std::cout << "SubscriptionService| ERROR: address returned zero handle." << std::endl;
00235 --LastMonitoredItemId;
00236 result.Status = OpcUa::StatusCode::BadNodeAttributesInvalid;
00237 return result;
00238 }
00239 }
00240 result.Status = OpcUa::StatusCode::Good;
00241 result.RevisedSamplingInterval = Data.RevisedPublishingInterval;
00242 result.RevisedQueueSize = request.RequestedParameters.QueueSize;
00243 result.FilterResult = request.RequestedParameters.Filter;
00244 MonitoredDataChange mdata;
00245 mdata.Parameters = result;
00246 mdata.Mode = request.MonitoringMode;
00247 mdata.ClientHandle = request.RequestedParameters.ClientHandle;
00248 mdata.CallbackHandle = callbackHandle;
00249 mdata.MonitoredItemId = result.MonitoredItemId;
00250 MonitoredDataChanges[result.MonitoredItemId] = mdata;
00251 if (Debug) std::cout << "Created MonitoredItem with id: " << result.MonitoredItemId << " and client handle " << mdata.ClientHandle << std::endl;
00252
00253 if (request.ItemToMonitor.AttributeId != AttributeId::EventNotifier )
00254 {
00255 TriggerDataChangeEvent(mdata, request.ItemToMonitor);
00256 }
00257
00258 return result;
00259 }
00260
00261 void InternalSubscription::TriggerDataChangeEvent(MonitoredDataChange monitoreditems, ReadValueId attrval)
00262 {
00263 if (Debug) { std::cout << "InternalSubcsription | Manual Trigger of DataChangeEvent for sub: " << Data.SubscriptionId << " and clienthandle: " << monitoreditems.ClientHandle << std::endl; }
00264 ReadParameters params;
00265 params.AttributesToRead.push_back(attrval);
00266 std::vector<DataValue> vals = AddressSpace.Read(params);
00267
00268 TriggeredDataChange event;
00269 event.MonitoredItemId = monitoreditems.MonitoredItemId;
00270 event.Data.ClientHandle = monitoreditems.ClientHandle;
00271 event.Data.Value = vals[0];
00272 TriggeredDataChangeEvents.push_back(event);
00273 }
00274
00275 std::vector<StatusCode> InternalSubscription::DeleteMonitoredItemsIds(const std::vector<uint32_t>& monitoreditemsids)
00276 {
00277 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00278
00279 std::vector<StatusCode> results;
00280 for (const uint32_t& handle: monitoreditemsids)
00281 {
00282 if (Debug) std::cout << "InternalSubcsription | Deleting Monitoreditemsid: " << handle << std::endl;
00283
00284 if ( DeleteMonitoredEvent(handle) )
00285 {
00286 results.push_back(StatusCode::Good);
00287 continue;
00288 }
00289
00290 if ( DeleteMonitoredDataChange(handle) )
00291 {
00292 results.push_back(StatusCode::Good);
00293 continue;
00294 }
00295
00296 results.push_back(StatusCode::BadMonitoredItemIdInvalid);
00297
00298 }
00299 return results;
00300 }
00301
00302 bool InternalSubscription::DeleteMonitoredDataChange(uint32_t handle)
00303 {
00304 MonitoredDataChangeMap::iterator it = MonitoredDataChanges.find(handle);
00305 if ( it == MonitoredDataChanges.end() )
00306 {
00307 return false;
00308 }
00309 else
00310 {
00311 if (it->second.CallbackHandle != 0){
00312 AddressSpace.DeleteDataChangeCallback(it->second.CallbackHandle);
00313 }
00314 MonitoredDataChanges.erase(handle);
00315
00316 for(auto ev = TriggeredDataChangeEvents.begin(); ev != TriggeredDataChangeEvents.end();)
00317 {
00318 if(ev->MonitoredItemId == handle)
00319 {
00320 if (Debug) std::cout << "InternalSubscription | Remove triggeredEvent for monitoreditemid " << handle << std::endl;
00321 ev = TriggeredDataChangeEvents.erase(ev);
00322 }
00323 else
00324 {
00325 ++ev;
00326 }
00327 }
00328 return true;
00329 }
00330 }
00331
00332 bool InternalSubscription::DeleteMonitoredEvent(uint32_t handle)
00333 {
00334 for (auto pair : MonitoredEvents)
00335 {
00336 if ( pair.second == handle )
00337 {
00338 MonitoredEvents.erase(pair.first);
00339
00340 for(auto ev = TriggeredEvents.begin(); ev != TriggeredEvents.end();)
00341 {
00342 if(ev->MonitoredItemId == handle)
00343 {
00344 if (Debug) std::cout << "InternalSubscription | Remove triggeredEvent for monitoreditemid " << handle << std::endl;
00345 ev = TriggeredEvents.erase(ev);
00346 }
00347 else
00348 {
00349 ++ev;
00350 }
00351 }
00352 return true;
00353 }
00354 }
00355 return false;
00356 }
00357
00358 void InternalSubscription::DataChangeCallback(const uint32_t& m_id, const DataValue& value)
00359 {
00360 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00361
00362 TriggeredDataChange event;
00363 MonitoredDataChangeMap::iterator it_monitoreditem = MonitoredDataChanges.find(m_id);
00364 if ( it_monitoreditem == MonitoredDataChanges.end())
00365 {
00366 std::cout << "InternalSubcsription | DataChangeCallback called for unknown item" << std::endl;
00367 return ;
00368 }
00369
00370 event.MonitoredItemId = it_monitoreditem->first;
00371 event.Data.ClientHandle = it_monitoreditem->second.ClientHandle;
00372 event.Data.Value = value;
00373 if (Debug) { std::cout << "InternalSubcsription | Enqueued DataChange triggered item for sub: " << Data.SubscriptionId << " and clienthandle: " << event.Data.ClientHandle << std::endl; }
00374 TriggeredDataChangeEvents.push_back(event);
00375 }
00376
00377 void InternalSubscription::TriggerEvent(NodeId node, Event event)
00378 {
00379 boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00380
00381 MonitoredEventsMap::iterator it = MonitoredEvents.find(node);
00382 if ( it == MonitoredEvents.end() )
00383 {
00384 if (Debug) std::cout << "InternalSubcsription | Subscription: " << Data.SubscriptionId << " has no subcsription for this event" << std::endl;
00385 return;
00386 }
00387 lock.unlock();
00388 EnqueueEvent(it->second, event);
00389 }
00390
00391 bool InternalSubscription::EnqueueEvent(uint32_t monitoreditemid, const Event& event)
00392 {
00393 if (Debug) { std::cout << "InternalSubcsription | Enqueing event to be send" << std::endl; }
00394 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00395
00396 if (Debug) { std::cout << "enqueueing event: " << event << std::endl;}
00397
00398
00399 std::map<uint32_t, MonitoredDataChange>::iterator mii_it = MonitoredDataChanges.find( monitoreditemid );
00400 if (mii_it == MonitoredDataChanges.end() )
00401 {
00402 if (Debug) std::cout << "InternalSubcsription | monitoreditem " << monitoreditemid << " is already deleted" << std::endl;
00403 return false;
00404 }
00405
00406
00407
00408 EventFieldList fieldlist;
00409 fieldlist.ClientHandle = mii_it->second.ClientHandle;
00410 fieldlist.EventFields = GetEventFields(mii_it->second.Parameters.FilterResult.Event, event);
00411 TriggeredEvent ev;
00412 ev.Data = fieldlist;
00413 ev.MonitoredItemId = monitoreditemid;
00414 TriggeredEvents.push_back(ev);
00415 return true;
00416 }
00417
00418 std::vector<Variant> InternalSubscription::GetEventFields(const EventFilter& filter, const Event& event)
00419 {
00420
00421 std::vector<Variant> fields;
00422 if(Debug) std::cout << "InternalSubscription | InternalGetEventField " << filter.SelectClauses.size() << std::endl;
00423 for (SimpleAttributeOperand sattr : filter.SelectClauses)
00424 {
00425 if(Debug) std::cout << "InternalSubscription | BrowsePath size " << sattr.BrowsePath.size() << std::endl;
00426 if ( sattr.BrowsePath.size() == 0 )
00427 {
00428 fields.push_back(event.GetValue(sattr.Attribute));
00429 }
00430 else
00431 {
00432 if(Debug) std::cout << "InternalSubscription | sending value for : " << sattr.BrowsePath[0] << std::endl;
00433 if ( sattr.BrowsePath[0] == QualifiedName("EventId", 0) )
00434 {
00435 fields.push_back(event.EventId);
00436 }
00437 else if ( sattr.BrowsePath[0] == QualifiedName("EventType", 0) )
00438 {
00439 fields.push_back(event.EventType);
00440 }
00441 else if ( sattr.BrowsePath[0] == QualifiedName("SourceNode", 0) )
00442 {
00443 fields.push_back(event.SourceNode);
00444 }
00445 else if ( sattr.BrowsePath[0] == QualifiedName("SourceName", 0) )
00446 {
00447 fields.push_back(event.SourceName);
00448 }
00449 else if ( sattr.BrowsePath[0] == QualifiedName("Message", 0) )
00450 {
00451 if (Debug) std::cout << "msg is: " << event.Message << std::endl;
00452 fields.push_back(event.Message);
00453 }
00454 else if ( sattr.BrowsePath[0] == QualifiedName("Severity", 0) )
00455 {
00456 fields.push_back(event.Severity);
00457 }
00458 else if ( sattr.BrowsePath[0] == QualifiedName("LocalTime", 0) )
00459 {
00460 fields.push_back(event.LocalTime);
00461 }
00462 else if ( sattr.BrowsePath[0] == QualifiedName("ReceiveTime", 0) )
00463 {
00464 fields.push_back(event.ReceiveTime);
00465 }
00466 else if ( sattr.BrowsePath[0] == QualifiedName("Time", 0) )
00467 {
00468 fields.push_back(event.Time);
00469 }
00470 else
00471 {
00472 fields.push_back(event.GetValue(sattr.BrowsePath));
00473 }
00474 }
00475 }
00476 return fields;
00477 }
00478
00479
00480 }
00481 }
00482
00483