qps_worker.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
21 #include <memory>
22 #include <mutex>
23 #include <sstream>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "absl/memory/memory.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/cpu.h>
33 #include <grpc/support/log.h>
34 #include <grpcpp/client_context.h>
36 #include <grpcpp/server.h>
37 #include <grpcpp/server_builder.h>
38 
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
43 #include "test/cpp/qps/client.h"
45 #include "test/cpp/qps/server.h"
48 
49 namespace grpc {
50 namespace testing {
51 
52 static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
53  gpr_log(GPR_INFO, "Starting client of type %s %s %d",
54  ClientType_Name(config.client_type()).c_str(),
55  RpcType_Name(config.rpc_type()).c_str(),
56  config.payload_config().has_bytebuf_params());
57 
58  switch (config.client_type()) {
59  case ClientType::SYNC_CLIENT:
61  case ClientType::ASYNC_CLIENT:
62  return config.payload_config().has_bytebuf_params()
65  case ClientType::CALLBACK_CLIENT:
67  default:
68  abort();
69  }
70 }
71 
72 static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
73  gpr_log(GPR_INFO, "Starting server of type %s",
74  ServerType_Name(config.server_type()).c_str());
75 
76  switch (config.server_type()) {
77  case ServerType::SYNC_SERVER:
79  case ServerType::ASYNC_SERVER:
80  return CreateAsyncServer(config);
81  case ServerType::ASYNC_GENERIC_SERVER:
83  case ServerType::CALLBACK_SERVER:
85  default:
86  abort();
87  }
88 }
89 
90 class ScopedProfile final {
91  public:
92  ScopedProfile(const char* filename, bool enable) : enable_(enable) {
94  }
97  }
98 
99  private:
100  const bool enable_;
101 };
102 
103 class WorkerServiceImpl final : public WorkerService::Service {
104  public:
107 
111  gpr_log(GPR_INFO, "RunClient: Entering");
112  InstanceGuard g(this);
113  if (!g.Acquired()) {
114  return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
115  }
116 
117  ScopedProfile profile("qps_client.prof", false);
119  gpr_log(GPR_INFO, "RunClient: Returning");
120  return ret;
121  }
122 
126  gpr_log(GPR_INFO, "RunServer: Entering");
127  InstanceGuard g(this);
128  if (!g.Acquired()) {
129  return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
130  }
131 
132  ScopedProfile profile("qps_server.prof", false);
134  gpr_log(GPR_INFO, "RunServer: Returning");
135  return ret;
136  }
137 
138  Status CoreCount(ServerContext* /*ctx*/, const CoreRequest*,
139  CoreResponse* resp) override {
140  resp->set_cores(gpr_cpu_num_cores());
141  return Status::OK;
142  }
143 
144  Status QuitWorker(ServerContext* /*ctx*/, const Void*, Void*) override {
145  InstanceGuard g(this);
146  if (!g.Acquired()) {
147  return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
148  }
149 
150  worker_->MarkDone();
151  return Status::OK;
152  }
153 
154  private:
155  // Protect against multiple clients using this worker at once.
157  public:
159  : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
161  if (acquired_) {
163  }
164  }
165 
166  bool Acquired() const { return acquired_; }
167 
168  private:
170  const bool acquired_;
171  };
172 
174  std::lock_guard<std::mutex> g(mu_);
175  if (acquired_) return false;
176  acquired_ = true;
177  return true;
178  }
179 
181  std::lock_guard<std::mutex> g(mu_);
183  acquired_ = false;
184  }
185 
188  ClientArgs args;
189  if (!stream->Read(&args)) {
190  return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
191  }
192  if (!args.has_setup()) {
193  return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
194  }
195  gpr_log(GPR_INFO, "RunClientBody: about to create client");
196  std::unique_ptr<Client> client = CreateClient(args.setup());
197  if (!client) {
198  return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
199  }
200  gpr_log(GPR_INFO, "RunClientBody: client created");
201  ClientStatus status;
202  if (!stream->Write(status)) {
203  return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
204  }
205  gpr_log(GPR_INFO, "RunClientBody: creation status reported");
206  while (stream->Read(&args)) {
207  gpr_log(GPR_INFO, "RunClientBody: Message read");
208  if (!args.has_mark()) {
209  gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
210  return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
211  }
212  *status.mutable_stats() = client->Mark(args.mark().reset());
213  if (!stream->Write(status)) {
214  return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
215  }
216  gpr_log(GPR_INFO, "RunClientBody: Mark response given");
217  }
218 
219  gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
220  client->AwaitThreadsCompletion();
221 
222  gpr_log(GPR_INFO, "RunClientBody: Returning");
223  return Status::OK;
224  }
225 
228  ServerArgs args;
229  if (!stream->Read(&args)) {
230  return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
231  }
232  if (!args.has_setup()) {
233  return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
234  }
235  if (server_port_ > 0 && args.setup().port() == 0) {
236  args.mutable_setup()->set_port(server_port_);
237  }
238  gpr_log(GPR_INFO, "RunServerBody: about to create server");
239  std::unique_ptr<Server> server = CreateServer(args.setup());
240  if (g_inproc_servers != nullptr) {
241  g_inproc_servers->push_back(server.get());
242  }
243  if (!server) {
244  return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
245  }
246  gpr_log(GPR_INFO, "RunServerBody: server created");
247  ServerStatus status;
248  status.set_port(server->port());
249  status.set_cores(server->cores());
250  if (!stream->Write(status)) {
251  return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
252  }
253  gpr_log(GPR_INFO, "RunServerBody: creation status reported");
254  while (stream->Read(&args)) {
255  gpr_log(GPR_INFO, "RunServerBody: Message read");
256  if (!args.has_mark()) {
257  gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
258  return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
259  }
260  *status.mutable_stats() = server->Mark(args.mark().reset());
261  if (!stream->Write(status)) {
262  return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
263  }
264  gpr_log(GPR_INFO, "RunServerBody: Mark response given");
265  }
266 
267  gpr_log(GPR_INFO, "RunServerBody: Returning");
268  return Status::OK;
269  }
270 
272  bool acquired_;
275 };
276 
277 QpsWorker::QpsWorker(int driver_port, int server_port,
278  const std::string& credential_type) {
279  impl_ = absl::make_unique<WorkerServiceImpl>(server_port, this);
280  gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
281 
282  std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
283  builder->AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);
284  if (driver_port >= 0) {
286  builder->AddListeningPort(
287  server_address.c_str(),
288  GetCredentialsProvider()->GetServerCredentials(credential_type));
289  }
290  builder->RegisterService(impl_.get());
291 
292  server_ = builder->BuildAndStart();
293  if (server_ == nullptr) {
295  "QpsWorker: Fail to BuildAndStart(driver_port=%d, server_port=%d)",
296  driver_port, server_port);
297  } else {
299  "QpsWorker: BuildAndStart(driver_port=%d, server_port=%d) done",
300  driver_port, server_port);
301  }
302 }
303 
305 
306 bool QpsWorker::Done() const {
307  return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0));
308 }
310  gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1));
311 }
312 } // namespace testing
313 } // namespace grpc
grpc::testing::WorkerServiceImpl::RunClientBody
Status RunClientBody(ServerContext *, ServerReaderWriter< ClientStatus, ClientArgs > *stream)
Definition: qps_worker.cc:186
test_credentials_provider.h
grpc::testing::WorkerServiceImpl::InstanceGuard::Acquired
bool Acquired() const
Definition: qps_worker.cc:166
gpr_cpu_num_cores
GPRAPI unsigned gpr_cpu_num_cores(void)
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
testing
Definition: aws_request_signer_test.cc:25
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
filename
const char * filename
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:135
grpc::ServerContext
Definition: grpcpp/impl/codegen/server_context.h:566
grpc::testing::WorkerServiceImpl::WorkerServiceImpl
WorkerServiceImpl(int server_port, QpsWorker *worker)
Definition: qps_worker.cc:105
log.h
ctx
Definition: benchmark-async.c:30
grpc::testing::WorkerServiceImpl::InstanceGuard::~InstanceGuard
~InstanceGuard()
Definition: qps_worker.cc:160
grpc::testing::CreateGenericAsyncStreamingClient
std::unique_ptr< Client > CreateGenericAsyncStreamingClient(const ClientConfig &config)
Definition: client_async.cc:955
grpc
Definition: grpcpp/alarm.h:33
grpc::testing::WorkerServiceImpl::RunClient
Status RunClient(ServerContext *ctx, ServerReaderWriter< ClientStatus, ClientArgs > *stream) override
Definition: qps_worker.cc:108
false
#define false
Definition: setup_once.h:323
mutex
static uv_mutex_t mutex
Definition: threadpool.c:34
client
Definition: examples/python/async_streaming/client.py:1
server_port
static int server_port
Definition: bad_server_response_test.cc:86
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
grpc::testing::WorkerServiceImpl::InstanceGuard::impl_
WorkerServiceImpl *const impl_
Definition: qps_worker.cc:169
grpc::testing::CreateAsyncGenericServer
std::unique_ptr< Server > CreateAsyncGenericServer(const ServerConfig &config)
Definition: server_async.cc:588
qps_worker.h
grpc::ServerReaderWriter
Definition: grpcpp/impl/codegen/sync_stream.h:786
grpc::testing::QpsWorker::~QpsWorker
~QpsWorker()
Definition: qps_worker.cc:304
grpc::testing::QpsWorker::server_
std::unique_ptr< grpc::Server > server_
Definition: qps_worker.h:54
grpc::testing::QpsWorker::MarkDone
void MarkDone()
Definition: qps_worker.cc:309
grpc::testing::ScopedProfile::ScopedProfile
ScopedProfile(const char *filename, bool enable)
Definition: qps_worker.cc:92
grpc::testing::QpsWorker::QpsWorker
QpsWorker(int driver_port, int server_port, const std::string &credential_type)
Definition: qps_worker.cc:277
create_test_channel.h
server_address
std::string server_address("0.0.0.0:10000")
GRPC_ARG_ALLOW_REUSEPORT
#define GRPC_ARG_ALLOW_REUSEPORT
Definition: grpc_types.h:295
qps_server_builder.h
grpc_profiler_start
void grpc_profiler_start(const char *filename)
Definition: grpc_profiler.cc:30
grpc::testing::QpsWorker::impl_
std::unique_ptr< WorkerServiceImpl > impl_
Definition: qps_worker.h:53
grpc::testing::WorkerServiceImpl::QuitWorker
Status QuitWorker(ServerContext *, const Void *, Void *) override
Definition: qps_worker.cc:144
profile_analyzer.builder
builder
Definition: profile_analyzer.py:159
grpc.StatusCode.RESOURCE_EXHAUSTED
tuple RESOURCE_EXHAUSTED
Definition: src/python/grpcio/grpc/__init__.py:270
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc::testing::WorkerServiceImpl::server_port_
int server_port_
Definition: qps_worker.cc:273
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::testing::CreateClient
static std::unique_ptr< Client > CreateClient(const ClientConfig &config)
Definition: qps_worker.cc:52
grpc.StatusCode.UNKNOWN
tuple UNKNOWN
Definition: src/python/grpcio/grpc/__init__.py:262
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
worker
Definition: worker.py:1
grpc::testing::WorkerServiceImpl::acquired_
bool acquired_
Definition: qps_worker.cc:272
http2_server_health_check.resp
resp
Definition: http2_server_health_check.py:31
grpc::testing::QpsWorker
Definition: qps_worker.h:39
tests.stress.unary_stream_benchmark.profile
def profile(message_size, response_count)
Definition: unary_stream_benchmark.py:78
grpc::testing::QpsWorker::done_
gpr_atm done_
Definition: qps_worker.h:56
grpc.h
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
framework.rpc.grpc_csds.ClientConfig
ClientConfig
Definition: grpc_csds.py:40
grpc_core::JoinHostPort
std::string JoinHostPort(absl::string_view host, int port)
Definition: host_port.cc:32
grpc::testing::WorkerServiceImpl::TryAcquireInstance
bool TryAcquireInstance()
Definition: qps_worker.cc:173
grpc::testing::WorkerServiceImpl::RunServer
Status RunServer(ServerContext *ctx, ServerReaderWriter< ServerStatus, ServerArgs > *stream) override
Definition: qps_worker.cc:123
grpc::testing::WorkerServiceImpl::InstanceGuard::acquired_
const bool acquired_
Definition: qps_worker.cc:170
cpu.h
grpc::testing::WorkerServiceImpl::mu_
std::mutex mu_
Definition: qps_worker.cc:271
grpc::testing::CreateCallbackServer
std::unique_ptr< Server > CreateCallbackServer(const ServerConfig &config)
Definition: test/cpp/qps/server_callback.cc:133
grpc::testing::g_inproc_servers
std::vector< grpc::testing::Server * > * g_inproc_servers
Definition: driver.cc:363
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
grpc::testing::ScopedProfile::enable_
const bool enable_
Definition: qps_worker.cc:100
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
grpc_profiler.h
host_port.h
grpc::testing::WorkerServiceImpl::RunServerBody
Status RunServerBody(ServerContext *, ServerReaderWriter< ServerStatus, ServerArgs > *stream)
Definition: qps_worker.cc:226
grpc::testing::WorkerServiceImpl::InstanceGuard
Definition: qps_worker.cc:156
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
g
struct @717 g
histogram.h
server_credentials.h
grpc::testing::CreateSynchronousServer
std::unique_ptr< Server > CreateSynchronousServer(const ServerConfig &config)
Definition: server_sync.cc:191
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc::testing::CreateSynchronousClient
std::unique_ptr< Client > CreateSynchronousClient(const ClientConfig &config)
Definition: client_sync.cc:404
grpc::testing::CreateQpsServerBuilder
std::unique_ptr< ServerBuilder > CreateQpsServerBuilder()
Definition: qps_server_builder.cc:37
client_context.h
grpc::testing::WorkerServiceImpl
Definition: qps_worker.cc:103
server
Definition: examples/python/async_streaming/server.py:1
grpc::testing::ScopedProfile::~ScopedProfile
~ScopedProfile()
Definition: qps_worker.cc:95
grpc::testing::GetCredentialsProvider
CredentialsProvider * GetCredentialsProvider()
Definition: test_credentials_provider.cc:169
grpc::testing::ScopedProfile
Definition: qps_worker.cc:90
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
alloc.h
grpc::protobuf::util::Status
GRPC_CUSTOM_UTIL_STATUS Status
Definition: include/grpcpp/impl/codegen/config_protobuf.h:93
grpc.StatusCode.INVALID_ARGUMENT
tuple INVALID_ARGUMENT
Definition: src/python/grpcio/grpc/__init__.py:263
grpc_profiler_stop
void grpc_profiler_stop(void)
Definition: grpc_profiler.cc:44
grpc::Status
Definition: include/grpcpp/impl/codegen/status.h:35
config_s
Definition: bloaty/third_party/zlib/deflate.c:120
grpc::testing::CreateCallbackClient
std::unique_ptr< Client > CreateCallbackClient(const ClientConfig &config)
Definition: test/cpp/qps/client_callback.cc:371
grpc::testing::WorkerServiceImpl::ReleaseInstance
void ReleaseInstance()
Definition: qps_worker.cc:180
server.h
grpc::testing::WorkerServiceImpl::worker_
QpsWorker * worker_
Definition: qps_worker.cc:274
grpc::testing::CreateServer
static std::unique_ptr< Server > CreateServer(const ServerConfig &config)
Definition: qps_worker.cc:72
grpc::testing::WorkerServiceImpl::CoreCount
Status CoreCount(ServerContext *, const CoreRequest *, CoreResponse *resp) override
Definition: qps_worker.cc:138
grpc::testing::QpsWorker::Done
bool Done() const
Definition: qps_worker.cc:306
grpc::testing::CreateAsyncClient
std::unique_ptr< Client > CreateAsyncClient(const ClientConfig &config)
Definition: client_async.cc:934
client.h
server_builder.h
grpc::testing::WorkerServiceImpl::InstanceGuard::InstanceGuard
InstanceGuard(WorkerServiceImpl *impl)
Definition: qps_worker.cc:158
server.h
grpc::testing::CreateAsyncServer
std::unique_ptr< Server > CreateAsyncServer(const ServerConfig &config)
Definition: server_async.cc:575
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:50