interop_server.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <fstream>
20 #include <memory>
21 #include <sstream>
22 #include <thread>
23 
24 #include "absl/flags/flag.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 
35 #include "src/proto/grpc/testing/empty.pb.h"
36 #include "src/proto/grpc/testing/messages.pb.h"
37 #include "src/proto/grpc/testing/test.grpc.pb.h"
40 
41 ABSL_FLAG(bool, use_alts, false,
42  "Whether to use alts. Enable alts will disable tls.");
43 ABSL_FLAG(bool, use_tls, false, "Whether to use tls.");
45  "User provided credentials type.");
46 ABSL_FLAG(int32_t, port, 0, "Server port.");
47 ABSL_FLAG(int32_t, max_send_message_size, -1, "The maximum send message size.");
48 
49 using grpc::Server;
53 using grpc::ServerReader;
55 using grpc::ServerWriter;
56 using grpc::Status;
57 using grpc::WriteOptions;
66 using grpc::testing::TestService;
67 
68 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
69 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
70 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
71 
73  const auto& client_metadata = context->client_metadata();
74  GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
75  GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
76 
77  auto iter = client_metadata.find(kEchoInitialMetadataKey);
78  if (iter != client_metadata.end()) {
79  context->AddInitialMetadata(
81  std::string(iter->second.begin(), iter->second.end()));
82  }
83  iter = client_metadata.find(kEchoTrailingBinMetadataKey);
84  if (iter != client_metadata.end()) {
85  context->AddTrailingMetadata(
87  std::string(iter->second.begin(), iter->second.end()));
88  }
89  // Check if client sent a magic key in the header that makes us echo
90  // back the user-agent (for testing purpose)
91  iter = client_metadata.find(kEchoUserAgentKey);
92  if (iter != client_metadata.end()) {
93  iter = client_metadata.find("user-agent");
94  if (iter != client_metadata.end()) {
95  context->AddInitialMetadata(
97  std::string(iter->second.begin(), iter->second.end()));
98  }
99  }
100 }
101 
103  std::unique_ptr<char[]> body(new char[size]());
104  payload->set_body(body.get(), size);
105  return true;
106 }
107 
109  const bool compression_expected) {
110  const InteropServerContextInspector inspector(context);
111  const grpc_compression_algorithm received_compression =
112  inspector.GetCallCompressionAlgorithm();
113 
114  if (compression_expected) {
115  if (received_compression == GRPC_COMPRESS_NONE) {
116  // Expected some compression, got NONE. This is an error.
118  "Expected compression but got uncompressed request from client.");
119  return false;
120  }
121  if (!(inspector.WasCompressed())) {
123  "Failure: Requested compression in a compressable request, but "
124  "compression bit in message flags not set.");
125  return false;
126  }
127  } else {
128  // Didn't expect compression -> make sure the request is uncompressed
129  if (inspector.WasCompressed()) {
131  "Failure: Didn't requested compression, but compression bit in "
132  "message flags set.");
133  return false;
134  }
135  }
136  return true;
137 }
138 
139 class TestServiceImpl : public TestService::Service {
140  public:
142  const grpc::testing::Empty* /*request*/,
143  grpc::testing::Empty* /*response*/) override {
145  return Status::OK;
146  }
147 
148  // Response contains current timestamp. We ignore everything in the request.
150  const SimpleRequest* /*request*/,
151  SimpleResponse* response) override {
153  std::string timestamp = std::to_string(ts.tv_nsec);
154  response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
155  context->AddInitialMetadata("cache-control", "max-age=60, public");
156  return Status::OK;
157  }
158 
160  SimpleResponse* response) override {
162  if (request->has_response_compressed()) {
163  const bool compression_requested = request->response_compressed().value();
164  gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
165  compression_requested ? "enabled" : "disabled", __func__);
166  if (compression_requested) {
167  // Any level would do, let's go for HIGH because we are overachievers.
168  context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
169  } else {
170  context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
171  }
172  }
174  request->expect_compressed().value())) {
176  "Compressed request expectation not met.");
177  }
178  if (request->response_size() > 0) {
179  if (!SetPayload(request->response_size(), response->mutable_payload())) {
181  "Error creating payload.");
182  }
183  }
184 
185  if (request->has_response_status()) {
186  return Status(
187  static_cast<grpc::StatusCode>(request->response_status().code()),
188  request->response_status().message());
189  }
190 
191  return Status::OK;
192  }
193 
198  bool write_success = true;
199  for (int i = 0; write_success && i < request->response_parameters_size();
200  i++) {
201  if (!SetPayload(request->response_parameters(i).size(),
202  response.mutable_payload())) {
204  "Error creating payload.");
205  }
206  WriteOptions wopts;
207  if (request->response_parameters(i).has_compressed()) {
208  // Compress by default. Disabled on a per-message basis.
209  context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
210  const bool compression_requested =
211  request->response_parameters(i).compressed().value();
212  gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
213  compression_requested ? "enabled" : "disabled", __func__);
214  if (!compression_requested) {
215  wopts.set_no_compression();
216  } // else, compression is already enabled via the context.
217  }
218  int time_us;
219  if ((time_us = request->response_parameters(i).interval_us()) > 0) {
220  // Sleep before response if needed
221  gpr_timespec sleep_time =
224  gpr_sleep_until(sleep_time);
225  }
226  write_success = writer->Write(response, wopts);
227  }
228  if (write_success) {
229  return Status::OK;
230  } else {
231  return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
232  }
233  }
234 
239  int aggregated_payload_size = 0;
240  while (reader->Read(&request)) {
242  request.expect_compressed().value())) {
244  "Compressed request expectation not met.");
245  }
246  if (request.has_payload()) {
247  aggregated_payload_size += request.payload().body().size();
248  }
249  }
250  response->set_aggregated_payload_size(aggregated_payload_size);
251  return Status::OK;
252  }
253 
257  StreamingOutputCallRequest>* stream) override {
261  bool write_success = true;
262  while (write_success && stream->Read(&request)) {
263  if (request.has_response_status()) {
264  return Status(
265  static_cast<grpc::StatusCode>(request.response_status().code()),
266  request.response_status().message());
267  }
268  if (request.response_parameters_size() != 0) {
269  response.mutable_payload()->set_type(request.payload().type());
270  response.mutable_payload()->set_body(
271  std::string(request.response_parameters(0).size(), '\0'));
272  int time_us;
273  if ((time_us = request.response_parameters(0).interval_us()) > 0) {
274  // Sleep before response if needed
275  gpr_timespec sleep_time =
278  gpr_sleep_until(sleep_time);
279  }
280  write_success = stream->Write(response);
281  }
282  }
283  if (write_success) {
284  return Status::OK;
285  } else {
286  return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
287  }
288  }
289 
291  ServerContext* /*context*/,
293  StreamingOutputCallRequest>* stream) override {
294  std::vector<StreamingOutputCallRequest> requests;
296  while (stream->Read(&request)) {
297  requests.push_back(request);
298  }
299 
301  bool write_success = true;
302  for (unsigned int i = 0; write_success && i < requests.size(); i++) {
303  response.mutable_payload()->set_type(requests[i].payload().type());
304  if (requests[i].response_parameters_size() == 0) {
306  "Request does not have response parameters.");
307  }
308  response.mutable_payload()->set_body(
309  std::string(requests[i].response_parameters(0).size(), '\0'));
310  write_success = stream->Write(response);
311  }
312  if (write_success) {
313  return Status::OK;
314  } else {
315  return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
316  }
317  }
318 };
319 
321  const std::shared_ptr<ServerCredentials>& creds) {
322  RunServer(creds, absl::GetFlag(FLAGS_port), nullptr, nullptr);
323 }
324 
326  const std::shared_ptr<ServerCredentials>& creds,
327  std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
328  server_options) {
329  RunServer(creds, absl::GetFlag(FLAGS_port), nullptr,
331 }
332 
334  const std::shared_ptr<ServerCredentials>& creds, const int port,
335  ServerStartedCondition* server_started_condition) {
336  RunServer(creds, port, server_started_condition, nullptr);
337 }
338 
340  const std::shared_ptr<ServerCredentials>& creds, const int port,
341  ServerStartedCondition* server_started_condition,
342  std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
343  server_options) {
344  GPR_ASSERT(port != 0);
345  std::ostringstream server_address;
346  server_address << "0.0.0.0:" << port;
348 
351 
353  builder.RegisterService(&service);
354  builder.AddListeningPort(server_address.str(), creds);
355  if (server_options != nullptr) {
356  for (size_t i = 0; i < server_options->size(); i++) {
357  builder.SetOption(std::move((*server_options)[i]));
358  }
359  }
360  if (absl::GetFlag(FLAGS_max_send_message_size) >= 0) {
361  builder.SetMaxSendMessageSize(absl::GetFlag(FLAGS_max_send_message_size));
362  }
363  std::unique_ptr<Server> server(builder.BuildAndStart());
364  gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
365 
366  // Signal that the server has started.
367  if (server_started_condition) {
368  std::unique_lock<std::mutex> lock(server_started_condition->mutex);
369  server_started_condition->server_started = true;
370  server_started_condition->condition.notify_all();
371  }
372 
376  }
377 }
gpr_timespec::tv_nsec
int32_t tv_nsec
Definition: gpr_types.h:52
GPR_TIMESPAN
@ GPR_TIMESPAN
Definition: gpr_types.h:45
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
grpc::testing::InteropServerContextInspector::WasCompressed
bool WasCompressed() const
Definition: server_helper.cc:67
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc::ServerWriter
Definition: include/grpcpp/impl/codegen/completion_queue.h:60
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
log.h
SetPayload
bool SetPayload(int size, Payload *payload)
Definition: interop_server.cc:102
server_helper.h
messages_pb2.StreamingInputCallResponse
StreamingInputCallResponse
Definition: messages_pb2.py:618
MaybeEchoMetadata
void MaybeEchoMetadata(ServerContext *context)
Definition: interop_server.cc:72
string.h
grpc_compression_algorithm
grpc_compression_algorithm
Definition: compression_types.h:60
benchmark.request
request
Definition: benchmark.py:77
kEchoTrailingBinMetadataKey
const char kEchoTrailingBinMetadataKey[]
Definition: interop_server.cc:69
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
TestServiceImpl::EmptyCall
Status EmptyCall(ServerContext *context, const grpc::testing::Empty *, grpc::testing::Empty *) override
Definition: interop_server.cc:141
framework.rpc.grpc_channelz.Server
Server
Definition: grpc_channelz.py:42
OK
@ OK
Definition: cronet_status.h:43
grpc::ServerReaderWriter
Definition: grpcpp/impl/codegen/sync_stream.h:786
TestServiceImpl::CacheableUnaryCall
Status CacheableUnaryCall(ServerContext *context, const SimpleRequest *, SimpleResponse *response) override
Definition: interop_server.cc:149
time.h
grpc::testing::TestServiceImpl
TestMultipleServiceImpl< grpc::testing::EchoTestService::Service > TestServiceImpl
Definition: test_service_impl.h:498
GRPC_COMPRESS_NONE
@ GRPC_COMPRESS_NONE
Definition: compression_types.h:61
server_address
std::string server_address("0.0.0.0:10000")
grpc::testing::interop::g_got_sigint
gpr_atm g_got_sigint
Definition: interop_server_bootstrap.cc:25
grpc::WriteOptions
Per-message write options.
Definition: call_op_set.h:81
TestServiceImpl::UnaryCall
Status UnaryCall(ServerContext *context, const SimpleRequest *request, SimpleResponse *response) override
Definition: interop_server.cc:159
server
std::unique_ptr< Server > server
Definition: channelz_service_test.cc:330
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc.StatusCode
Definition: src/python/grpcio/grpc/__init__.py:232
grpc::ServerBuilder
A builder class for the creation and startup of grpc::Server instances.
Definition: grpcpp/server_builder.h:86
CheckExpectedCompression
bool CheckExpectedCompression(const ServerContext &context, const bool compression_expected)
Definition: interop_server.cc:108
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::interop::RunServer
void RunServer(const std::shared_ptr< ServerCredentials > &creds)
Definition: interop_server.cc:320
grpc::testing::InteropServerContextInspector::GetCallCompressionAlgorithm
grpc_compression_algorithm GetCallCompressionAlgorithm() const
Definition: server_helper.cc:58
gpr_sleep_until
GPRAPI void gpr_sleep_until(gpr_timespec until)
grpc.h
kEchoInitialMetadataKey
const char kEchoInitialMetadataKey[]
Definition: interop_server.cc:68
GRPC_COMPRESS_LEVEL_NONE
@ GRPC_COMPRESS_LEVEL_NONE
Definition: compression_types.h:73
custom_credentials_type
std::string custom_credentials_type("INSECURE_CREDENTIALS")
absl::GetFlag
ABSL_MUST_USE_RESULT T GetFlag(const absl::Flag< T > &flag)
Definition: abseil-cpp/absl/flags/flag.h:98
messages_pb2.StreamingOutputCallRequest
StreamingOutputCallRequest
Definition: messages_pb2.py:632
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
GRPC_COMPRESS_LEVEL_HIGH
@ GRPC_COMPRESS_LEVEL_HIGH
Definition: compression_types.h:76
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
messages_pb2.Payload
Payload
Definition: messages_pb2.py:583
server_credentials.h
writer
void writer(void *n)
Definition: libuv/docs/code/locks/main.c:22
tests.unit._exit_scenarios.port
port
Definition: _exit_scenarios.py:179
TestServiceImpl::HalfDuplexCall
Status HalfDuplexCall(ServerContext *, ServerReaderWriter< StreamingOutputCallResponse, StreamingOutputCallRequest > *stream) override
Definition: interop_server.cc:290
grpc.beta.implementations.server_options
def server_options(multi_method_implementation=None, request_deserializers=None, response_serializers=None, thread_pool=None, thread_pool_size=None, default_timeout=None, maximum_timeout=None)
Definition: implementations.py:258
GPR_CLOCK_PRECISE
@ GPR_CLOCK_PRECISE
Definition: gpr_types.h:42
kEchoUserAgentKey
const char kEchoUserAgentKey[]
Definition: interop_server.cc:70
gpr_time_add
GPRAPI gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b)
Definition: src/core/lib/gpr/time.cc:135
TestServiceImpl::FullDuplexCall
Status FullDuplexCall(ServerContext *context, ServerReaderWriter< StreamingOutputCallResponse, StreamingOutputCallRequest > *stream) override
Definition: interop_server.cc:254
TestServiceImpl::StreamingInputCall
Status StreamingInputCall(ServerContext *context, ServerReader< StreamingInputCallRequest > *reader, StreamingInputCallResponse *response) override
Definition: interop_server.cc:235
gpr_time_from_micros
GPRAPI gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:115
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
grpc.StatusCode.INVALID_ARGUMENT
tuple INVALID_ARGUMENT
Definition: src/python/grpcio/grpc/__init__.py:263
grpc::ServerReader
Definition: include/grpcpp/impl/codegen/completion_queue.h:58
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
RunServer
void RunServer()
Definition: examples/cpp/compression/greeter_server.cc:51
messages_pb2.StreamingInputCallRequest
StreamingInputCallRequest
Definition: messages_pb2.py:611
test_config.h
grpc::testing::interop::ServerStartedCondition
Definition: server_helper.h:56
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
ABSL_FLAG
ABSL_FLAG(bool, use_alts, false, "Whether to use alts. Enable alts will disable tls.")
messages_pb2.StreamingOutputCallResponse
StreamingOutputCallResponse
Definition: messages_pb2.py:639
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
iter
Definition: test_winkernel.cpp:47
TestServiceImpl::StreamingOutputCall
Status StreamingOutputCall(ServerContext *context, const StreamingOutputCallRequest *request, ServerWriter< StreamingOutputCallResponse > *writer) override
Definition: interop_server.cc:194
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
grpc.beta.implementations.ServerCredentials
ServerCredentials
Definition: implementations.py:231
server.h
gpr_timespec
Definition: gpr_types.h:50
service
__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod NSString * service
Definition: ProtoMethod.h:25
grpc.StatusCode.INTERNAL
tuple INTERNAL
Definition: src/python/grpcio/grpc/__init__.py:277
size
voidpf void uLong size
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604
grpc::WriteOptions::set_no_compression
WriteOptions & set_no_compression()
Definition: call_op_set.h:94
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
gen_server_registered_method_bad_client_test_body.payload
list payload
Definition: gen_server_registered_method_bad_client_test_body.py:40
grpc::testing::InteropServerContextInspector
Definition: server_helper.h:37
to_string
static bool to_string(zval *from)
Definition: protobuf/php/ext/google/protobuf/convert.c:333
TestServiceImpl
Definition: interop_server.cc:139
server_builder.h
reader
void reader(void *n)
Definition: libuv/docs/code/locks/main.c:8
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
gpr_time_from_seconds
GPRAPI gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type clock_type)
Definition: src/core/lib/gpr/time.cc:123
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:07