00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "opc_tcp_processor.h"
00012
00013 #include "opcua_protocol.h"
00014
00015 #include <opc/common/uri_facade.h>
00016 #include <opc/ua/connection_listener.h>
00017 #include <opc/ua/protocol/binary/common.h>
00018 #include <opc/ua/protocol/binary/stream.h>
00019 #include <opc/ua/protocol/input_from_buffer.h>
00020 #include <opc/ua/protocol/monitored_items.h>
00021 #include <opc/ua/protocol/object_ids.h>
00022 #include <opc/ua/protocol/secure_channel.h>
00023 #include <opc/ua/protocol/session.h>
00024 #include <opc/ua/protocol/status_codes.h>
00025 #include <opc/ua/protocol/string_utils.h>
00026 #include <opc/ua/server/addons/endpoints_services.h>
00027 #include <opc/ua/server/addons/opcua_protocol.h>
00028 #include <opc/ua/server/addons/services_registry.h>
00029
00030 #include <boost/thread/locks.hpp>
00031 #include <chrono>
00032 #include <iostream>
00033 #include <list>
00034 #include <mutex>
00035 #include <stdexcept>
00036 #include <sstream>
00037 #include <queue>
00038
00039
00040 namespace OpcUa
00041 {
00042 namespace Server
00043 {
00044
00045 using namespace OpcUa::Binary;
00046
00047 OpcTcpMessages::OpcTcpMessages(std::shared_ptr<OpcUa::Services> computer, OpcUa::OutputChannel& outputChannel, bool debug)
00048 : Server(computer)
00049 , OutputStream(outputChannel)
00050 , Debug(debug)
00051 , ChannelId(1)
00052 , TokenId(2)
00053 , SessionId(GenerateSessionId())
00054 , SequenceNb(0)
00055 {
00056 std::cout << "opc_tcp_processor| Debug is " << Debug << std::endl;
00057 std::cout << "opc_tcp_processor| SessionId is " << Debug << std::endl;
00058 }
00059
00060
00061 OpcTcpMessages::~OpcTcpMessages()
00062 {
00063
00064 try
00065 {
00066 DeleteAllSubscriptions();
00067 }
00068 catch (const std::exception& exc)
00069 {
00070 std::cerr << "Error during stopping OpcTcpMessages. " << exc.what() <<std::endl;
00071 }
00072 }
00073
00074 bool OpcTcpMessages::ProcessMessage(MessageType msgType, IStreamBinary& iStream)
00075 {
00076 boost::unique_lock<boost::shared_mutex> lock(ProcessMutex);
00077
00078 switch (msgType)
00079 {
00080 case MT_HELLO:
00081 {
00082 if (Debug) std::clog << "opc_tcp_processor| Accepted hello message." << std::endl;
00083 HelloClient(iStream, OutputStream);
00084 break;
00085 }
00086
00087
00088 case MT_SECURE_OPEN:
00089 {
00090 if (Debug) std::clog << "opc_tcp_processor| Opening secure channel." << std::endl;
00091 OpenChannel(iStream, OutputStream);
00092 break;
00093 }
00094
00095 case MT_SECURE_CLOSE:
00096 {
00097 if (Debug) std::clog << "opc_tcp_processor| Closing secure channel." << std::endl;
00098 CloseChannel(iStream);
00099 return false;
00100 }
00101
00102 case MT_SECURE_MESSAGE:
00103 {
00104 if (Debug) std::clog << "opc_tcp_processor| Processing secure message." << std::endl;
00105 ProcessRequest(iStream, OutputStream);
00106 break;
00107 }
00108
00109 case MT_ACKNOWLEDGE:
00110 {
00111 if (Debug) std::clog << "opc_tcp_processor| Received acknowledge from client. This should not have happend..." << std::endl;
00112 throw std::logic_error("Thank to client about acknowledge.");
00113 }
00114
00115 case MT_ERROR:
00116 {
00117 if (Debug) std::clog << "opc_tcp_processor| There is an error happend in the client!" << std::endl;
00118 throw std::logic_error("It is very nice get to know server about error in the client.");
00119 }
00120
00121 default:
00122 {
00123 if (Debug) std::clog << "opc_tcp_processor| Unknown message type '" << msgType << "' received!" << std::endl;
00124 throw std::logic_error("Invalid message type received.");
00125 }
00126 }
00127 return true;
00128 }
00129
00130 void OpcTcpMessages::ForwardPublishResponse(const PublishResult result)
00131 {
00132 boost::unique_lock<boost::shared_mutex> lock(ProcessMutex);
00133
00134 if (Debug) std::clog << "opc_tcp_processor| Sending PublishResult to client!" << std::endl;
00135 if ( PublishRequestQueue.empty() )
00136 {
00137 std::cerr << "Error trying to send publish response while we do not have data from a PublishRequest" << std::endl;
00138 return;
00139 }
00140 PublishRequestElement requestData = PublishRequestQueue.front();
00141 PublishRequestQueue.pop();
00142
00143 PublishResponse response;
00144
00145 FillResponseHeader(requestData.requestHeader, response.Header);
00146 response.Parameters = result;
00147
00148 requestData.sequence.SequenceNumber = ++SequenceNb;
00149
00150 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00151 secureHeader.AddSize(RawSize(requestData.algorithmHeader));
00152 secureHeader.AddSize(RawSize(requestData.sequence));
00153 secureHeader.AddSize(RawSize(response));
00154 if (Debug) {
00155 std::cout << "opc_tcp_processor| Sedning publishResponse with " << response.Parameters.NotificationMessage.NotificationData.size() << " PublishResults" << std::endl;
00156 }
00157 OutputStream << secureHeader << requestData.algorithmHeader << requestData.sequence << response << flush;
00158 };
00159
00160 void OpcTcpMessages::HelloClient(IStreamBinary& istream, OStreamBinary& ostream)
00161 {
00162 using namespace OpcUa::Binary;
00163
00164 if (Debug) std::clog << "opc_tcp_processor| Reading hello message." << std::endl;
00165 Hello hello;
00166 istream >> hello;
00167
00168 Acknowledge ack;
00169 ack.ReceiveBufferSize = hello.ReceiveBufferSize;
00170 ack.SendBufferSize = hello.SendBufferSize;
00171 ack.MaxMessageSize = hello.MaxMessageSize;
00172 ack.MaxChunkCount = 1;
00173
00174 Header ackHeader(MT_ACKNOWLEDGE, CHT_SINGLE);
00175 ackHeader.AddSize(RawSize(ack));
00176 if (Debug) std::clog << "opc_tcp_processor| Sending answer to client." << std::endl;
00177 ostream << ackHeader << ack << flush;
00178 }
00179
00180 void OpcTcpMessages::OpenChannel(IStreamBinary& istream, OStreamBinary& ostream)
00181 {
00182 uint32_t channelId = 0;
00183 istream >> channelId;
00184 AsymmetricAlgorithmHeader algorithmHeader;
00185 istream >> algorithmHeader;
00186
00187 if (algorithmHeader.SecurityPolicyUri != "http://opcfoundation.org/UA/SecurityPolicy#None")
00188 {
00189 throw std::logic_error(std::string("Client want to create secure channel with unsupported policy '") + algorithmHeader.SecurityPolicyUri + std::string("'"));
00190 }
00191
00192 SequenceHeader sequence;
00193 istream >> sequence;
00194
00195 OpenSecureChannelRequest request;
00196 istream >> request;
00197
00198 if (request.Parameters.SecurityMode != MessageSecurityMode::None)
00199 {
00200 throw std::logic_error("Unsupported security mode.");
00201 }
00202
00203 if (request.Parameters.RequestType == SecurityTokenRequestType::Renew)
00204 {
00205
00206 ++TokenId;
00207 }
00208
00209 sequence.SequenceNumber = ++SequenceNb;
00210
00211 OpenSecureChannelResponse response;
00212 FillResponseHeader(request.Header, response.Header);
00213 response.ChannelSecurityToken.SecureChannelId = ChannelId;
00214 response.ChannelSecurityToken.TokenId = TokenId;
00215 response.ChannelSecurityToken.CreatedAt = OpcUa::DateTime::Current();
00216 response.ChannelSecurityToken.RevisedLifetime = request.Parameters.RequestLifeTime;
00217
00218 SecureHeader responseHeader(MT_SECURE_OPEN, CHT_SINGLE, ChannelId);
00219 responseHeader.AddSize(RawSize(algorithmHeader));
00220 responseHeader.AddSize(RawSize(sequence));
00221 responseHeader.AddSize(RawSize(response));
00222 ostream << responseHeader << algorithmHeader << sequence << response << flush;
00223 }
00224
00225 void OpcTcpMessages::CloseChannel(IStreamBinary& istream)
00226 {
00227 uint32_t channelId = 0;
00228 istream >> channelId;
00229
00230 SymmetricAlgorithmHeader algorithmHeader;
00231 istream >> algorithmHeader;
00232
00233 SequenceHeader sequence;
00234 istream >> sequence;
00235
00236 CloseSecureChannelRequest request;
00237 istream >> request;
00238 }
00239
00240 void OpcTcpMessages::ProcessRequest(IStreamBinary& istream, OStreamBinary& ostream)
00241 {
00242 uint32_t channelId = 0;
00243 istream >> channelId;
00244
00245 SymmetricAlgorithmHeader algorithmHeader;
00246 istream >> algorithmHeader;
00247
00248 SequenceHeader sequence;
00249 istream >> sequence;
00250
00251 NodeId typeId;
00252 istream >> typeId;
00253
00254 RequestHeader requestHeader;
00255 istream >> requestHeader;
00256
00257 sequence.SequenceNumber = ++SequenceNb;
00258
00259
00260
00261
00262
00263
00264
00265
00266 const OpcUa::MessageId message = GetMessageId(typeId);
00267 switch (message)
00268 {
00269 case OpcUa::GET_ENDPOINTS_REQUEST:
00270 {
00271 if (Debug) std::clog << "opc_tcp_processor| Processing get endpoints request." << std::endl;
00272 GetEndpointsParameters filter;
00273 istream >> filter;
00274
00275 GetEndpointsResponse response;
00276 FillResponseHeader(requestHeader, response.Header);
00277 response.Endpoints = Server->Endpoints()->GetEndpoints(filter);
00278
00279 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00280 secureHeader.AddSize(RawSize(algorithmHeader));
00281 secureHeader.AddSize(RawSize(sequence));
00282 secureHeader.AddSize(RawSize(response));
00283 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00284 return;
00285 }
00286
00287 case OpcUa::FIND_ServerS_REQUEST:
00288 {
00289 if (Debug) std::clog << "opc_tcp_processor| Processing 'Find Servers' request." << std::endl;
00290 FindServersParameters params;
00291 istream >> params;
00292
00293 FindServersResponse response;
00294 FillResponseHeader(requestHeader, response.Header);
00295 response.Data.Descriptions = Server->Endpoints()->FindServers(params);
00296
00297 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00298 secureHeader.AddSize(RawSize(algorithmHeader));
00299 secureHeader.AddSize(RawSize(sequence));
00300 secureHeader.AddSize(RawSize(response));
00301 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00302 return;
00303 }
00304
00305 case OpcUa::BROWSE_REQUEST:
00306 {
00307 if (Debug) std::clog << "opc_tcp_processor| Processing browse request." << std::endl;
00308 NodesQuery query;
00309 istream >> query;
00310
00311 BrowseResponse response;
00312 response.Results = Server->Views()->Browse(query);
00313
00314 FillResponseHeader(requestHeader, response.Header);
00315
00316 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00317 secureHeader.AddSize(RawSize(algorithmHeader));
00318 secureHeader.AddSize(RawSize(sequence));
00319 secureHeader.AddSize(RawSize(response));
00320 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00321 return;
00322 }
00323
00324 case OpcUa::READ_REQUEST:
00325 {
00326 ReadParameters params;
00327 istream >> params;
00328
00329 if (Debug)
00330 {
00331 std::clog << "opc_tcp_processor| Processing read request for Node:";
00332 for (ReadValueId id : params.AttributesToRead)
00333 {
00334 std::clog << "opc_tcp_processor| " << id.NodeId;
00335 }
00336 std::cout << std::endl;
00337 }
00338
00339 ReadResponse response;
00340 FillResponseHeader(requestHeader, response.Header);
00341 std::vector<DataValue> values;
00342 if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
00343 {
00344 values = service->Read(params);
00345 }
00346 else
00347 {
00348 for (auto attribId : params.AttributesToRead)
00349 {
00350 DataValue value;
00351 value.Encoding = DATA_VALUE_STATUS_CODE;
00352 value.Status = OpcUa::StatusCode::BadNotImplemented;
00353 values.push_back(value);
00354 }
00355 }
00356 response.Results = values;
00357
00358 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00359 secureHeader.AddSize(RawSize(algorithmHeader));
00360 secureHeader.AddSize(RawSize(sequence));
00361 secureHeader.AddSize(RawSize(response));
00362 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00363
00364 return;
00365 }
00366
00367 case OpcUa::WRITE_REQUEST:
00368 {
00369 if (Debug) std::clog << "opc_tcp_processor| Processing write request." << std::endl;
00370 WriteParameters params;
00371 istream >> params;
00372
00373 WriteResponse response;
00374 FillResponseHeader(requestHeader, response.Header);
00375 std::vector<DataValue> values;
00376 if (std::shared_ptr<OpcUa::AttributeServices> service = Server->Attributes())
00377 {
00378 response.Results = service->Write(params.NodesToWrite);
00379 }
00380 else
00381 {
00382 response.Results = std::vector<StatusCode>(params.NodesToWrite.size(), OpcUa::StatusCode::BadNotImplemented);
00383 }
00384
00385 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00386 secureHeader.AddSize(RawSize(algorithmHeader));
00387 secureHeader.AddSize(RawSize(sequence));
00388 secureHeader.AddSize(RawSize(response));
00389 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00390
00391 return;
00392 }
00393
00394 case TRANSLATE_BROWSE_PATHS_TO_NODE_IdS_REQUEST:
00395 {
00396 if (Debug) std::clog << "opc_tcp_processor| Processing 'Translate Browse Paths To Node Ids' request." << std::endl;
00397 TranslateBrowsePathsParameters params;
00398 istream >> params;
00399
00400 if (Debug)
00401 {
00402 for ( BrowsePath path : params.BrowsePaths)
00403 {
00404 std::cout << "opc_tcp_processor| Requested path is: " << path.StartingNode << " : " ;
00405 for ( RelativePathElement el : path.Path.Elements)
00406 {
00407 std::cout << "/" << el.TargetName ;
00408 }
00409 std::cout << std::endl;
00410 }
00411 }
00412
00413 std::vector<BrowsePathResult> result = Server->Views()->TranslateBrowsePathsToNodeIds(params);
00414
00415 if (Debug)
00416 {
00417 for (BrowsePathResult res: result)
00418 {
00419 std::cout << "opc_tcp_processor| Result of browsePath is: " << (uint32_t) res.Status << ". Target is: ";
00420 for ( BrowsePathTarget path : res.Targets)
00421 {
00422 std::cout << path.Node ;
00423 }
00424 std::cout << std::endl;
00425 }
00426 }
00427
00428 TranslateBrowsePathsToNodeIdsResponse response;
00429 FillResponseHeader(requestHeader, response.Header);
00430 response.Result.Paths = result;
00431 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00432 secureHeader.AddSize(RawSize(algorithmHeader));
00433 secureHeader.AddSize(RawSize(sequence));
00434 secureHeader.AddSize(RawSize(response));
00435
00436 if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Translate Browse Paths To Node Ids' request." << std::endl;
00437 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00438 return;
00439 }
00440
00441
00442 case CREATE_SESSION_REQUEST:
00443 {
00444 if (Debug) std::clog << "opc_tcp_processor| Processing create session request." << std::endl;
00445 CreateSessionParameters params;
00446 istream >> params;
00447
00448 CreateSessionResponse response;
00449 FillResponseHeader(requestHeader, response.Header);
00450
00451 response.Parameters.SessionId = SessionId;
00452 response.Parameters.AuthenticationToken = SessionId;
00453 response.Parameters.RevisedSessionTimeout = params.RequestedSessionTimeout;
00454 response.Parameters.MaxRequestMessageSize = 65536;
00455 GetEndpointsParameters epf;
00456 response.Parameters.ServerEndpoints = Server->Endpoints()->GetEndpoints(epf);
00457
00458
00459 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00460 secureHeader.AddSize(RawSize(algorithmHeader));
00461 secureHeader.AddSize(RawSize(sequence));
00462 secureHeader.AddSize(RawSize(response));
00463 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00464
00465 return;
00466 }
00467 case ACTIVATE_SESSION_REQUEST:
00468 {
00469 if (Debug) std::clog << "opc_tcp_processor| Processing activate session request." << std::endl;
00470 ActivateSessionParameters params;
00471 istream >> params;
00472
00473 ActivateSessionResponse response;
00474 FillResponseHeader(requestHeader, response.Header);
00475
00476 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00477 secureHeader.AddSize(RawSize(algorithmHeader));
00478 secureHeader.AddSize(RawSize(sequence));
00479 secureHeader.AddSize(RawSize(response));
00480 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00481 return;
00482 }
00483
00484 case CLOSE_SESSION_REQUEST:
00485 {
00486 if (Debug) std::clog << "opc_tcp_processor| Processing close session request." << std::endl;
00487 bool deleteSubscriptions = false;
00488 istream >> deleteSubscriptions;
00489
00490 if (deleteSubscriptions)
00491 {
00492 DeleteAllSubscriptions();
00493 }
00494
00495 CloseSessionResponse response;
00496 FillResponseHeader(requestHeader, response.Header);
00497
00498 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00499 secureHeader.AddSize(RawSize(algorithmHeader));
00500 secureHeader.AddSize(RawSize(sequence));
00501 secureHeader.AddSize(RawSize(response));
00502 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00503 if (Debug) std::clog << "opc_tcp_processor| Session Closed " << std::endl;
00504 return;
00505 }
00506
00507 case CREATE_SUBSCRIPTION_REQUEST:
00508 {
00509 if (Debug) std::clog << "opc_tcp_processor| Processing create subscription request." << std::endl;
00510 CreateSubscriptionRequest request;
00511 istream >> request.Parameters;
00512 request.Header = requestHeader;
00513
00514 CreateSubscriptionResponse response;
00515 FillResponseHeader(requestHeader, response.Header);
00516
00517 response.Data = Server->Subscriptions()->CreateSubscription(request, [this](PublishResult i){
00518 try
00519 {
00520 this->ForwardPublishResponse(i);
00521 }
00522 catch (std::exception& ex)
00523 {
00524
00525 std::cerr << "Error forwarding publishResult to client: " << ex.what() << std::endl;
00526 }
00527 });
00528
00529 Subscriptions.push_back(response.Data.SubscriptionId);
00530
00531 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00532 secureHeader.AddSize(RawSize(algorithmHeader));
00533 secureHeader.AddSize(RawSize(sequence));
00534 secureHeader.AddSize(RawSize(response));
00535 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00536 return;
00537 }
00538
00539 case DELETE_SUBSCRIPTION_REQUEST:
00540 {
00541 if (Debug) std::clog << "opc_tcp_processor| Processing delete subscription request." << std::endl;
00542 std::vector<uint32_t> ids;
00543 istream >> ids;
00544
00545 DeleteSubscriptions(ids);
00546
00547 DeleteSubscriptionsResponse response;
00548 FillResponseHeader(requestHeader, response.Header);
00549
00550 response.Results = Server->Subscriptions()->DeleteSubscriptions(ids);
00551
00552 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00553 secureHeader.AddSize(RawSize(algorithmHeader));
00554 secureHeader.AddSize(RawSize(sequence));
00555 secureHeader.AddSize(RawSize(response));
00556
00557 if (Debug) std::clog << "opc_tcp_processor| Sending response to Delete Subscription Request." << std::endl;
00558 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00559 return;
00560 }
00561
00562 case CREATE_MONITORED_ITEMS_REQUEST:
00563 {
00564 if (Debug) std::clog << "opc_tcp_processor| Processing 'Create Monitored Items' request." << std::endl;
00565 MonitoredItemsParameters params;
00566 istream >> params;
00567
00568 CreateMonitoredItemsResponse response;
00569
00570 response.Results = Server->Subscriptions()->CreateMonitoredItems(params);
00571
00572 FillResponseHeader(requestHeader, response.Header);
00573 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00574 secureHeader.AddSize(RawSize(algorithmHeader));
00575 secureHeader.AddSize(RawSize(sequence));
00576 secureHeader.AddSize(RawSize(response));
00577
00578 if (Debug) std::clog << "opc_tcp_processor| Sending response to Create Monitored Items Request." << std::endl;
00579 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00580 return;
00581 }
00582
00583 case DELETE_MONITORED_ITEMS_REQUEST:
00584 {
00585 if (Debug) std::clog << "opc_tcp_processor| Processing 'Delete Monitored Items' request." << std::endl;
00586 DeleteMonitoredItemsParameters params;
00587 istream >> params;
00588
00589 DeleteMonitoredItemsResponse response;
00590
00591 response.Results = Server->Subscriptions()->DeleteMonitoredItems(params);
00592
00593 FillResponseHeader(requestHeader, response.Header);
00594 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00595 secureHeader.AddSize(RawSize(algorithmHeader));
00596 secureHeader.AddSize(RawSize(sequence));
00597 secureHeader.AddSize(RawSize(response));
00598
00599 if (Debug) std::clog << "opc_tcp_processor| Sending response to Delete Monitored Items Request." << std::endl;
00600 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00601 return;
00602 }
00603
00604 case PUBLISH_REQUEST:
00605 {
00606 if (Debug) std::clog << "opc_tcp_processor| Processing 'Publish' request." << std::endl;
00607 PublishRequest request;
00608 request.Header = requestHeader;
00609 istream >> request.SubscriptionAcknowledgements;
00610
00611 PublishRequestElement data;
00612 data.sequence = sequence;
00613 data.algorithmHeader = algorithmHeader;
00614 data.requestHeader = requestHeader;
00615 PublishRequestQueue.push(data);
00616 Server->Subscriptions()->Publish(request);
00617
00618 --SequenceNb;
00619
00620 return;
00621 }
00622
00623 case SET_PUBLISHING_MODE_REQUEST:
00624 {
00625 if (Debug) std::clog << "opc_tcp_processor| Processing 'Set Publishing Mode' request." << std::endl;
00626 PublishingModeParameters params;
00627 istream >> params;
00628
00629
00630 SetPublishingModeResponse response;
00631 FillResponseHeader(requestHeader, response.Header);
00632 response.Result.Results.resize(params.SubscriptionIds.size(), StatusCode::Good);
00633
00634 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00635 secureHeader.AddSize(RawSize(algorithmHeader));
00636 secureHeader.AddSize(RawSize(sequence));
00637 secureHeader.AddSize(RawSize(response));
00638
00639 if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Set Publishing Mode' request." << std::endl;
00640 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00641 return;
00642 }
00643
00644 case ADD_NODES_REQUEST:
00645 {
00646 if (Debug) std::clog << "opc_tcp_processor| Processing 'Add Nodes' request." << std::endl;
00647 AddNodesParameters params;
00648 istream >> params;
00649
00650 std::vector<AddNodesResult> results = Server->NodeManagement()->AddNodes(params.NodesToAdd);
00651
00652 AddNodesResponse response;
00653 FillResponseHeader(requestHeader, response.Header);
00654 response.results = results;
00655
00656 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00657 secureHeader.AddSize(RawSize(algorithmHeader));
00658 secureHeader.AddSize(RawSize(sequence));
00659 secureHeader.AddSize(RawSize(response));
00660
00661 if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Add Nodes' request." << std::endl;
00662 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00663 return;
00664 }
00665
00666 case ADD_REFERENCES_REQUEST:
00667 {
00668 if (Debug) std::clog << "opc_tcp_processor| Processing 'Add References' request." << std::endl;
00669 AddReferencesParameters params;
00670 istream >> params;
00671
00672 std::vector<StatusCode> results = Server->NodeManagement()->AddReferences(params.ReferencesToAdd);
00673
00674 AddReferencesResponse response;
00675 FillResponseHeader(requestHeader, response.Header);
00676 response.Results = results;
00677
00678 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00679 secureHeader.AddSize(RawSize(algorithmHeader));
00680 secureHeader.AddSize(RawSize(sequence));
00681 secureHeader.AddSize(RawSize(response));
00682
00683 if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Add References' request." << std::endl;
00684 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00685 return;
00686 }
00687
00688 case REPUBLISH_REQUEST:
00689 {
00690 if (Debug) std::clog << "opc_tcp_processor| Processing 'Republish' request." << std::endl;
00691 RepublishParameters params;
00692 istream >> params;
00693
00694
00695 RepublishResponse response;
00696 FillResponseHeader(requestHeader, response.Header);
00697 response.Header.ServiceResult = StatusCode::BadMessageNotAvailable;
00698
00699 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00700 secureHeader.AddSize(RawSize(algorithmHeader));
00701 secureHeader.AddSize(RawSize(sequence));
00702 secureHeader.AddSize(RawSize(response));
00703
00704 if (Debug) std::clog << "opc_tcp_processor| Sending response to 'Republish' request." << std::endl;
00705 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00706 return;
00707 }
00708
00709 case CALL_REQUEST:
00710 {
00711 if (Debug) std::clog << "opc_tcp_processor| Processing call request." << std::endl;
00712 CreateSessionParameters params;
00713 istream >> params;
00714
00715 CreateSessionResponse response;
00716 FillResponseHeader(requestHeader, response.Header);
00717
00718 response.Parameters.SessionId = SessionId;
00719 response.Parameters.AuthenticationToken = SessionId;
00720 response.Parameters.RevisedSessionTimeout = params.RequestedSessionTimeout;
00721 response.Parameters.MaxRequestMessageSize = 65536;
00722 GetEndpointsParameters epf;
00723 response.Parameters.ServerEndpoints = Server->Endpoints()->GetEndpoints(epf);
00724
00725
00726 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00727 secureHeader.AddSize(RawSize(algorithmHeader));
00728 secureHeader.AddSize(RawSize(sequence));
00729 secureHeader.AddSize(RawSize(response));
00730 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00731
00732 return;
00733 }
00734
00735
00736 default:
00737 {
00738 ServiceFaultResponse response;
00739 FillResponseHeader(requestHeader, response.Header);
00740 response.Header.ServiceResult = StatusCode::BadNotImplemented;
00741
00742 SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
00743 secureHeader.AddSize(RawSize(algorithmHeader));
00744 secureHeader.AddSize(RawSize(sequence));
00745 secureHeader.AddSize(RawSize(response));
00746
00747 if (Debug) std::cerr << "opc_tcp_processor| Sending ServiceFaultResponse to unsupported request of id: " << message << std::endl;
00748 ostream << secureHeader << algorithmHeader << sequence << response << flush;
00749 return;
00750 }
00751 }
00752 }
00753
00754 void OpcTcpMessages::FillResponseHeader(const RequestHeader& requestHeader, ResponseHeader& responseHeader)
00755 {
00756
00757 responseHeader.Timestamp = DateTime::Current();
00758 responseHeader.RequestHandle = requestHeader.RequestHandle;
00759 }
00760
00761 void OpcTcpMessages::DeleteAllSubscriptions()
00762 {
00763 std::vector<uint32_t> subs;
00764 for (const uint32_t& subid: Subscriptions)
00765 {
00766 subs.push_back(subid);
00767 }
00768 Server->Subscriptions()->DeleteSubscriptions(subs);
00769 Subscriptions.clear();
00770 }
00771
00772 void OpcTcpMessages::DeleteSubscriptions(const std::vector<uint32_t>& ids)
00773 {
00774 for ( auto id : ids )
00775 {
00776 Subscriptions.erase(std::remove_if(Subscriptions.begin(), Subscriptions.end(),
00777 [&](const uint32_t d) { return ( d == id) ; }), Subscriptions.end());
00778 }
00779 }
00780
00781 }
00782 }