Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "subscription_service_internal.h"
00011
00012 #include <boost/thread/locks.hpp>
00013
00014
00015 namespace OpcUa
00016 {
00017 namespace Internal
00018 {
00019
00020 SubscriptionServiceInternal::SubscriptionServiceInternal(Server::AddressSpace::SharedPtr addressspace, boost::asio::io_service& ioService, bool debug)
00021 : io(ioService)
00022 , AddressSpace(addressspace)
00023 , Debug(debug)
00024 {
00025 }
00026
00027 SubscriptionServiceInternal::~SubscriptionServiceInternal()
00028 {
00029 }
00030
00031 Server::AddressSpace& SubscriptionServiceInternal::GetAddressSpace()
00032 {
00033 return *AddressSpace;
00034 }
00035
00036 boost::asio::io_service& SubscriptionServiceInternal::GetIOService()
00037 {
00038 return io;
00039 }
00040
00041 void SubscriptionServiceInternal::DeleteAllSubscriptions()
00042 {
00043 if (Debug) std::cout << "SubscriptionService | Deleting all subscriptions." << std::endl;
00044
00045 std::vector<uint32_t> ids(SubscriptionsMap.size());
00046 {
00047 boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00048 std::transform(SubscriptionsMap.begin(), SubscriptionsMap.end(), ids.begin(), [](const SubscriptionsIdMap::value_type& i){return i.first;});
00049 }
00050
00051 DeleteSubscriptions(ids);
00052 }
00053
00054 std::vector<StatusCode> SubscriptionServiceInternal::DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
00055 {
00056 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00057
00058 std::vector<StatusCode> result;
00059 for (const uint32_t& subid: subscriptions)
00060 {
00061 SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(subid);
00062 if ( itsub == SubscriptionsMap.end())
00063 {
00064 std::cout << "SubscriptionService | Error, got request to delete non existing Subscription: " << subid << std::endl;
00065 result.push_back(StatusCode::BadSubscriptionIdInvalid);
00066 }
00067 else
00068 {
00069 if (Debug) std::cout << "SubscriptionService | Deleting Subscription: " << subid << std::endl;
00070 itsub->second->Stop();
00071 SubscriptionsMap.erase(subid);
00072 result.push_back(StatusCode::Good);
00073 }
00074 }
00075 return result;
00076 }
00077
00078 SubscriptionData SubscriptionServiceInternal::CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback)
00079 {
00080 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00081
00082 SubscriptionData data;
00083 data.SubscriptionId = ++LastSubscriptionId;
00084 data.RevisedLifetimeCount = request.Parameters.RequestedLifetimeCount;
00085 data.RevisedPublishingInterval = request.Parameters.RequestedPublishingInterval;
00086 data.RevisedMaxKeepAliveCount = request.Parameters.RequestedMaxKeepAliveCount;
00087 if (Debug) std::cout << "SubscriptionService | Creating Subscription with Id: " << data.SubscriptionId << std::endl;
00088
00089 std::shared_ptr<InternalSubscription> sub(new InternalSubscription(*this, data, request.Header.SessionAuthenticationToken, callback, Debug));
00090 sub->Start();
00091 SubscriptionsMap[data.SubscriptionId] = sub;
00092 return data;
00093 }
00094
00095 std::vector<MonitoredItemCreateResult> SubscriptionServiceInternal::CreateMonitoredItems(const MonitoredItemsParameters& params)
00096 {
00097 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00098
00099 std::vector<MonitoredItemCreateResult> data;
00100
00101 SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
00102 if ( itsub == SubscriptionsMap.end())
00103 {
00104 for (int j=0; j<(int)params.ItemsToCreate.size(); j++)
00105 {
00106 MonitoredItemCreateResult res;
00107 res.Status = StatusCode::BadSubscriptionIdInvalid;
00108 data.push_back(res);
00109 }
00110 return data;
00111 }
00112
00113 for (const MonitoredItemCreateRequest& req: params.ItemsToCreate)
00114 {
00115 MonitoredItemCreateResult result = itsub->second->CreateMonitoredItem(req);
00116 data.push_back(result);
00117 }
00118 return data;
00119
00120 }
00121
00122 std::vector<StatusCode> SubscriptionServiceInternal::DeleteMonitoredItems(const DeleteMonitoredItemsParameters& params)
00123 {
00124 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00125
00126 std::vector<StatusCode> results;
00127
00128 SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(params.SubscriptionId);
00129 if ( itsub == SubscriptionsMap.end())
00130 {
00131 for (int j=0; j<(int)params.MonitoredItemIds.size(); j++)
00132 {
00133 results.push_back(StatusCode::BadSubscriptionIdInvalid);
00134 }
00135 return results;
00136 }
00137
00138 results = itsub->second->DeleteMonitoredItemsIds(params.MonitoredItemIds);
00139 return results;
00140 }
00141
00142 void SubscriptionServiceInternal::Publish(const PublishRequest& request)
00143 {
00144 boost::unique_lock<boost::shared_mutex> lock(DbMutex);
00145
00146 if ( PublishRequestQueues[request.Header.SessionAuthenticationToken] < 100 )
00147 {
00148 PublishRequestQueues[request.Header.SessionAuthenticationToken] += 1;
00149 }
00150
00151
00152 for (SubscriptionAcknowledgement ack: request.SubscriptionAcknowledgements)
00153 {
00154 SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(ack.SubscriptionId);
00155 if ( sub_it != SubscriptionsMap.end())
00156 {
00157 sub_it->second->NewAcknowlegment(ack);
00158 }
00159 }
00160 }
00161
00162 RepublishResponse SubscriptionServiceInternal::Republish(const RepublishParameters& params)
00163 {
00164 boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00165
00166 SubscriptionsIdMap::iterator sub_it = SubscriptionsMap.find(params.SubscriptionId);
00167 if ( sub_it == SubscriptionsMap.end())
00168 {
00169 RepublishResponse response;
00170 response.Header.ServiceResult = StatusCode::BadSubscriptionIdInvalid;
00171 return response;
00172 }
00173 return sub_it->second->Republish(params);
00174 }
00175
00176
00177 bool SubscriptionServiceInternal::PopPublishRequest(NodeId node)
00178 {
00179 std::map<NodeId, uint32_t>::iterator queue_it = PublishRequestQueues.find(node);
00180 if ( queue_it == PublishRequestQueues.end() )
00181 {
00182 std::cout << "SubscriptionService | Error request for publish queue for unknown session: " << node << " queue are available for: ";
00183 for ( auto i: PublishRequestQueues ){
00184 std::cout << " " << i.first ;
00185 }
00186 std::cout << std::endl;
00187 return false;
00188 }
00189 else
00190 {
00191 if ( queue_it->second == 0 )
00192 {
00193 std::cout << "SubscriptionService | Missing publish request, cannot send response for session: " << node << std::endl;
00194 return false;
00195 }
00196 else
00197 {
00198 --queue_it->second;
00199 return true;
00200 }
00201 }
00202 }
00203
00204 void SubscriptionServiceInternal::TriggerEvent(NodeId node, Event event)
00205 {
00206 boost::shared_lock<boost::shared_mutex> lock(DbMutex);
00207
00208
00209
00210 if ( event.EventId.Data.empty() )
00211 {
00212 event.EventId = GenerateEventId();
00213 }
00214
00215 for (auto sub : SubscriptionsMap)
00216 {
00217 sub.second->TriggerEvent(node, event);
00218 }
00219
00220 }
00221
00222 }
00223
00224 namespace Server
00225 {
00226
00227 SubscriptionService::UniquePtr CreateSubscriptionService(std::shared_ptr<Server::AddressSpace> addressspace, boost::asio::io_service& io, bool debug)
00228 {
00229 return SubscriptionService::UniquePtr(new Internal::SubscriptionServiceInternal(addressspace, io, debug));
00230 }
00231
00232 }
00233 }
00234