server_sync.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <atomic>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_context.h>
27 
29 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
31 #include "test/cpp/qps/server.h"
33 
34 namespace grpc {
35 namespace testing {
36 
37 class BenchmarkServiceImpl final : public BenchmarkService::Service {
38  public:
40  SimpleResponse* response) override {
41  auto s = SetResponse(request, response);
42  if (!s.ok()) {
43  return s;
44  }
45  return Status::OK;
46  }
48  ServerContext* /*context*/,
51  while (stream->Read(&request)) {
53  auto s = SetResponse(&request, &response);
54  if (!s.ok()) {
55  return s;
56  }
57  if (!stream->Write(response)) {
58  return Status(StatusCode::INTERNAL, "Server couldn't respond");
59  }
60  }
61  return Status::OK;
62  }
65  SimpleResponse* response) override {
66  auto s = ClientPull(context, stream, response);
67  if (!s.ok()) {
68  return s;
69  }
70  return Status::OK;
71  }
73  const SimpleRequest* request,
76  auto s = SetResponse(request, &response);
77  if (!s.ok()) {
78  return s;
79  }
80  return ServerPush(context, stream, response, nullptr);
81  }
85  // Read the first client message to setup server response
87  if (!stream->Read(&request)) {
88  return Status::OK;
89  }
91  auto s = SetResponse(&request, &response);
92  if (!s.ok()) {
93  return s;
94  }
95  std::atomic_bool done;
96  Status sp;
97  std::thread t([context, stream, &response, &done, &sp]() {
98  sp = ServerPush(context, stream, response, [&done]() {
99  return done.load(std::memory_order_relaxed);
100  });
101  });
102  SimpleResponse phony;
103  auto cp = ClientPull(context, stream, &phony);
104  done.store(true, std::memory_order_relaxed); // can be lazy
105  t.join();
106  if (!cp.ok()) {
107  return cp;
108  }
109  if (!sp.ok()) {
110  return sp;
111  }
112  return Status::OK;
113  }
114 
115  private:
116  template <class R>
117  static Status ClientPull(ServerContext* /*context*/, R* stream,
120  while (stream->Read(&request)) {
121  }
122  if (request.response_size() > 0) {
123  if (!Server::SetPayload(request.response_type(), request.response_size(),
124  response->mutable_payload())) {
125  return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
126  }
127  }
128  return Status::OK;
129  }
130  template <class W>
131  static Status ServerPush(ServerContext* /*context*/, W* stream,
132  const SimpleResponse& response,
133  const std::function<bool()>& done) {
134  while ((done == nullptr) || !done()) {
135  // TODO(vjpai): Add potential for rate-pacing on this
136  if (!stream->Write(response)) {
137  return Status(StatusCode::INTERNAL, "Server couldn't push");
138  }
139  }
140  return Status::OK;
141  }
144  if (request->response_size() > 0) {
145  if (!Server::SetPayload(request->response_type(),
146  request->response_size(),
147  response->mutable_payload())) {
148  return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
149  }
150  }
151  return Status::OK;
152  }
153 };
154 
156  public:
157  explicit SynchronousServer(const ServerConfig& config) : Server(config) {
158  std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
159 
160  auto port_num = port();
161  // Negative port number means inproc server, so no listen port needed
162  if (port_num >= 0) {
164  builder->AddListeningPort(server_address.c_str(),
166  &port_num);
167  }
168 
170 
171  builder->RegisterService(&service_);
172 
173  impl_ = builder->BuildAndStart();
174  if (impl_ == nullptr) {
175  gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
176  } else {
177  gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
178  }
179  }
180 
181  std::shared_ptr<Channel> InProcessChannel(
182  const ChannelArguments& args) override {
183  return impl_->InProcessChannel(args);
184  }
185 
186  private:
188  std::unique_ptr<grpc::Server> impl_;
189 };
190 
191 std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
192  const ServerConfig& config) {
193  return std::unique_ptr<Server>(new SynchronousServer(config));
194 }
195 
196 } // namespace testing
197 } // namespace grpc
grpc::testing::SynchronousServer
Definition: server_sync.cc:155
messages_pb2.SimpleRequest
SimpleRequest
Definition: messages_pb2.py:597
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc::ServerWriter
Definition: include/grpcpp/impl/codegen/completion_queue.h:60
grpc::testing::BenchmarkServiceImpl::UnaryCall
Status UnaryCall(ServerContext *, const SimpleRequest *request, SimpleResponse *response) override
Definition: server_sync.cc:39
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
grpc
Definition: grpcpp/alarm.h:33
grpc::Status::ok
bool ok() const
Is the status OK?
Definition: include/grpcpp/impl/codegen/status.h:126
grpc::testing::SynchronousServer::service_
BenchmarkServiceImpl service_
Definition: server_sync.cc:187
benchmark.request
request
Definition: benchmark.py:77
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::ServerReaderWriter
Definition: grpcpp/impl/codegen/sync_stream.h:786
server_address
std::string server_address("0.0.0.0:10000")
qps_server_builder.h
grpc::testing::SynchronousServer::InProcessChannel
std::shared_ptr< Channel > InProcessChannel(const ChannelArguments &args) override
Definition: server_sync.cc:181
grpc::testing::SynchronousServer::SynchronousServer
SynchronousServer(const ServerConfig &config)
Definition: server_sync.cc:157
W
#define W
Definition: zlib/crc32.c:85
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc::testing::BenchmarkServiceImpl::StreamingFromServer
Status StreamingFromServer(ServerContext *context, const SimpleRequest *request, ServerWriter< SimpleResponse > *stream) override
Definition: server_sync.cc:72
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc::testing::SynchronousServer::impl_
std::unique_ptr< grpc::Server > impl_
Definition: server_sync.cc:188
grpc.h
grpc_core::JoinHostPort
std::string JoinHostPort(absl::string_view host, int port)
Definition: host_port.cc:32
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc::testing::Server::ApplyConfigToBuilder
static void ApplyConfigToBuilder(const ServerConfig &config, ServerBuilder *builder)
Definition: test/cpp/qps/server.h:123
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
host_port.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc::testing::Server::CreateServerCredentials
static std::shared_ptr< ServerCredentials > CreateServerCredentials(const ServerConfig &config)
Definition: test/cpp/qps/server.h:98
grpc::testing::Server::SetPayload
static bool SetPayload(PayloadType type, int size, Payload *payload)
Definition: test/cpp/qps/server.h:82
grpc::testing::Server
Definition: test/cpp/qps/server.h:42
server_credentials.h
grpc::testing::CreateSynchronousServer
std::unique_ptr< Server > CreateSynchronousServer(const ServerConfig &config)
Definition: server_sync.cc:191
grpc::testing::BenchmarkServiceImpl::StreamingCall
Status StreamingCall(ServerContext *, ServerReaderWriter< SimpleResponse, SimpleRequest > *stream) override
Definition: server_sync.cc:47
grpc::ChannelArguments
Definition: grpcpp/support/channel_arguments.h:39
grpc::testing::CreateQpsServerBuilder
std::unique_ptr< ServerBuilder > CreateQpsServerBuilder()
Definition: qps_server_builder.cc:37
grpc::testing::BenchmarkServiceImpl
Definition: server_sync.cc:37
grpc::testing::BenchmarkServiceImpl::ClientPull
static Status ClientPull(ServerContext *, R *stream, SimpleResponse *response)
Definition: server_sync.cc:117
alloc.h
server_context.h
asyncio_get_stats.response
response
Definition: asyncio_get_stats.py:28
grpc::testing::Server::port
int port() const
Definition: test/cpp/qps/server.h:96
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
grpc::ServerReader
Definition: include/grpcpp/impl/codegen/completion_queue.h:58
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
grpc::testing::BenchmarkServiceImpl::StreamingBothWays
Status StreamingBothWays(ServerContext *context, ServerReaderWriter< SimpleResponse, SimpleRequest > *stream) override
Definition: server_sync.cc:82
grpc::testing::BenchmarkServiceImpl::StreamingFromClient
Status StreamingFromClient(ServerContext *context, ServerReader< SimpleRequest > *stream, SimpleResponse *response) override
Definition: server_sync.cc:63
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
server.h
usage_timer.h
grpc.StatusCode.INTERNAL
tuple INTERNAL
Definition: src/python/grpcio/grpc/__init__.py:277
grpc::testing::BenchmarkServiceImpl::ServerPush
static Status ServerPush(ServerContext *, W *stream, const SimpleResponse &response, const std::function< bool()> &done)
Definition: server_sync.cc:131
function
std::function< bool(GrpcTool *, int, const char **, const CliCredentials &, GrpcToolOutputCallback)> function
Definition: grpc_tool.cc:250
messages_pb2.SimpleResponse
SimpleResponse
Definition: messages_pb2.py:604
thread
static uv_thread_t thread
Definition: test-async-null-cb.c:29
server.h
grpc::testing::BenchmarkServiceImpl::SetResponse
static Status SetResponse(const SimpleRequest *request, SimpleResponse *response)
Definition: server_sync.cc:142
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:17