21 #include <condition_variable>
31 #include "absl/algorithm/container.h"
32 #include "absl/flags/flag.h"
33 #include "absl/strings/str_split.h"
44 #include "src/proto/grpc/testing/empty.pb.h"
45 #include "src/proto/grpc/testing/messages.pb.h"
46 #include "src/proto/grpc/testing/test.grpc.pb.h"
51 "Fail client if any RPCs fail after first successful RPC.");
53 ABSL_FLAG(
bool, print_response,
false,
"Write RPC response to stdout.");
59 "Port to expose peer distribution stats service.");
61 "a comma separated list of rpc methods.");
64 "RPC status for the test RPC to be considered successful");
66 bool, secure_mode,
false,
67 "If true, XdsCredentials are used, InsecureChannelCredentials otherwise");
77 using grpc::testing::ClientConfigureRequest;
78 using grpc::testing::ClientConfigureRequest_RpcType_Name;
79 using grpc::testing::ClientConfigureResponse;
80 using grpc::testing::Empty;
81 using grpc::testing::LoadBalancerAccumulatedStatsRequest;
83 using grpc::testing::LoadBalancerStatsRequest;
85 using grpc::testing::LoadBalancerStatsService;
88 using grpc::testing::TestService;
89 using grpc::testing::XdsUpdateClientConfigureService;
110 ClientConfigureRequest::RpcType
type;
111 std::vector<std::pair<std::string, std::string>>
metadata;
128 std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
145 (start_id_ <= call->saved_request_id &&
148 std::lock_guard<std::mutex> lock(
m_);
161 stats_per_method[ClientConfigureRequest_RpcType_Name(
163 auto&
result = *method_stat.mutable_result();
177 std::unique_lock<std::mutex> lock(
m_);
182 auto& response_rpcs_by_method = *
response->mutable_rpcs_by_method();
185 if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
187 }
else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
195 auto& response_rpc_by_method = response_rpcs_by_method[
method_name];
196 auto& response_rpcs_by_peer =
197 *response_rpc_by_method.mutable_rpcs_by_peer();
198 for (
const auto& rpc_by_peer : rpc_by_type.second) {
199 auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
200 response_rpc_by_peer = rpc_by_peer.second;
208 std::unique_lock<std::mutex> lock(
m_);
212 auto& response_rpcs_started_by_method =
213 *
response->mutable_num_rpcs_started_by_method();
214 auto& response_rpcs_succeeded_by_method =
215 *
response->mutable_num_rpcs_succeeded_by_method();
216 auto& response_rpcs_failed_by_method =
217 *
response->mutable_num_rpcs_failed_by_method();
219 auto total_succeeded = 0;
220 for (
const auto& rpc_by_peer : rpc_by_type.second) {
221 total_succeeded += rpc_by_peer.second;
223 response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
224 rpc_by_type.first)] = total_succeeded;
225 response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
226 rpc_by_type.first)] =
228 response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
247 std::condition_variable
cv_;
258 int saved_request_id;
274 if (
data.first ==
"rpc-behavior" &&
data.second ==
"keep-open") {
279 call->context.set_deadline(deadline);
280 call->saved_request_id = saved_request_id;
281 call->rpc_type = ClientConfigureRequest::UNARY_CALL;
282 call->simple_response_reader =
stub_->PrepareAsyncUnaryCall(
283 &
call->context, SimpleRequest::default_instance(), &
cq_);
284 call->simple_response_reader->StartCall();
285 call->simple_response_reader->Finish(&
call->simple_response, &
call->status,
291 int saved_request_id;
307 if (
data.first ==
"rpc-behavior" &&
data.second ==
"keep-open") {
312 call->context.set_deadline(deadline);
313 call->saved_request_id = saved_request_id;
314 call->rpc_type = ClientConfigureRequest::EMPTY_CALL;
315 call->empty_response_reader =
stub_->PrepareAsyncEmptyCall(
317 call->empty_response_reader->StartCall();
318 call->empty_response_reader->Finish(&
call->empty_response, &
call->status,
330 auto server_initial_metadata =
call->context.GetServerInitialMetadata();
331 auto metadata_hostname =
332 call->context.GetServerInitialMetadata().find(
"hostname");
334 metadata_hostname !=
call->context.GetServerInitialMetadata().end()
336 metadata_hostname->second.length())
337 :
call->simple_response.hostname();
346 std::cout <<
"RPC failed: " <<
call->status.error_code() <<
": "
347 <<
call->status.error_message() << std::endl;
355 auto metadata_hostname =
356 call->context.GetServerInitialMetadata().find(
"hostname");
359 call->context.GetServerInitialMetadata().end()
361 metadata_hostname->second.length())
362 :
call->simple_response.hostname();
363 std::cout <<
"Greeting: Hello world, this is " << hostname
364 <<
", from " <<
call->context.peer() << std::endl;
382 std::unique_ptr<TestService::Stub>
stub_;
393 const LoadBalancerStatsRequest*
request,
401 end_id = start_id +
request->num_rpcs();
416 const LoadBalancerAccumulatedStatsRequest* ,
429 :
public XdsUpdateClientConfigureService::Service {
436 const ClientConfigureRequest*
request,
437 ClientConfigureResponse* )
override {
438 std::map<int, std::vector<std::pair<std::string, std::string>>>
441 metadata_map[
data.type()].push_back({
data.key(),
data.value()});
443 std::vector<RpcConfig>
configs;
444 for (
const auto& rpc :
request->types()) {
447 config.type =
static_cast<ClientConfigureRequest::RpcType
>(rpc);
448 auto metadata_iter = metadata_map.find(rpc);
449 if (metadata_iter != metadata_map.end()) {
450 config.metadata = metadata_iter->second;
455 std::lock_guard<std::mutex> lock(
466 void RunTestLoop(std::chrono::duration<double> duration_per_query,
479 std::chrono::time_point<std::chrono::system_clock>
start =
481 std::chrono::duration<double> elapsed;
485 std::vector<RpcConfig>
configs;
488 std::lock_guard<std::mutex> lockk(
497 if (elapsed > duration_per_query) {
500 if (
config.type == ClientConfigureRequest::EMPTY_CALL) {
502 }
else if (
config.type == ClientConfigureRequest::UNARY_CALL) {
524 builder.RegisterService(&stats_service);
525 builder.RegisterService(&client_config_service);
541 std::vector<std::string> rpc_metadata =
543 std::map<int, std::vector<std::pair<std::string, std::string>>> metadata_map;
544 for (
auto&
data : rpc_metadata) {
549 metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back(
551 }
else if (
metadata[0] ==
"UnaryCall") {
552 metadata_map[ClientConfigureRequest::UNARY_CALL].push_back(
558 std::vector<RpcConfig>
configs;
559 std::vector<std::string> rpc_methods =
561 for (
const std::string& rpc_method : rpc_methods) {
563 if (rpc_method ==
"EmptyCall") {
564 config.type = ClientConfigureRequest::EMPTY_CALL;
565 }
else if (rpc_method ==
"UnaryCall") {
566 config.type = ClientConfigureRequest::UNARY_CALL;
570 auto metadata_iter = metadata_map.find(
config.type);
571 if (metadata_iter != metadata_map.end()) {
572 config.metadata = metadata_iter->second;
582 int main(
int argc,
char** argv) {
593 std::lock_guard<std::mutex> lock(stats_watchers.
mu);
600 std::chrono::duration<double> duration_per_query =
604 std::vector<std::thread> test_threads;
608 &stats_watchers, &rpc_config_queue));
614 for (
auto it = test_threads.begin();
it != test_threads.end();
it++) {