channel_cc.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <atomic>
20 #include <cstring>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include <grpc/grpc.h>
30 #include <grpc/slice.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
36 #include <grpcpp/impl/call.h>
43 #include <grpcpp/impl/rpc_method.h>
45 #include <grpcpp/support/config.h>
46 #include <grpcpp/support/slice.h>
47 
49 
50 namespace grpc {
51 
54  const std::string& host, grpc_channel* channel,
55  std::vector<
56  std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
57  interceptor_creators)
58  : host_(host), c_channel_(channel) {
59  interceptor_creators_ = std::move(interceptor_creators);
61 }
62 
65  CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
66  if (callback_cq != nullptr) {
68  // gRPC-core provides the backing needed for the preferred CQ type
69  callback_cq->Shutdown();
70  } else {
72  }
73  }
74 }
75 
76 namespace {
77 
78 inline grpc_slice SliceFromArray(const char* arr, size_t len) {
80 }
81 
82 std::string GetChannelInfoField(grpc_channel* channel,
83  grpc_channel_info* channel_info,
84  char*** channel_info_field) {
85  char* value = nullptr;
86  memset(channel_info, 0, sizeof(*channel_info));
87  *channel_info_field = &value;
88  grpc_channel_get_info(channel, channel_info);
89  if (value == nullptr) return "";
91  gpr_free(value);
92  return result;
93 }
94 
95 } // namespace
96 
98  grpc_channel_info channel_info;
99  return GetChannelInfoField(c_channel_, &channel_info,
100  &channel_info.lb_policy_name);
101 }
102 
104  grpc_channel_info channel_info;
105  return GetChannelInfoField(c_channel_, &channel_info,
106  &channel_info.service_config_json);
107 }
108 
109 namespace experimental {
110 
113 }
114 
115 } // namespace experimental
116 
119  grpc::CompletionQueue* cq, size_t interceptor_pos) {
120  const bool kRegistered = method.channel_tag() && context->authority().empty();
121  grpc_call* c_call = nullptr;
122  if (kRegistered) {
126  method.channel_tag(), context->raw_deadline(), nullptr);
127  } else {
128  const ::std::string* host_str = nullptr;
129  if (!context->authority_.empty()) {
130  host_str = &context->authority_;
131  } else if (!host_.empty()) {
132  host_str = &host_;
133  }
134  grpc_slice method_slice =
135  SliceFromArray(method.name(), strlen(method.name()));
136  grpc_slice host_slice;
137  if (host_str != nullptr) {
138  host_slice = grpc::SliceFromCopiedString(*host_str);
139  }
140  c_call = grpc_channel_create_call(
142  context->propagation_options_.c_bitmask(), cq->cq(), method_slice,
143  host_str == nullptr ? nullptr : &host_slice, context->raw_deadline(),
144  nullptr);
145  grpc_slice_unref(method_slice);
146  if (host_str != nullptr) {
147  grpc_slice_unref(host_slice);
148  }
149  }
151 
152  // ClientRpcInfo should be set before call because set_call also checks
153  // whether the call has been cancelled, and if the call was cancelled, we
154  // should notify the interceptors too.
155  auto* info = context->set_client_rpc_info(
156  method.name(), method.suffix_for_stats(), method.method_type(), this,
157  interceptor_creators_, interceptor_pos);
158  context->set_call(c_call, shared_from_this());
159 
160  return grpc::internal::Call(c_call, this, cq, info);
161 }
162 
165  CompletionQueue* cq) {
166  return CreateCallInternal(method, context, cq, 0);
167 }
168 
171  ops->FillOps(
172  call); // Make a copy of call. It's fine since Call just has pointers
173 }
174 
175 void* Channel::RegisterMethod(const char* method) {
177  c_channel_, method, host_.empty() ? nullptr : host_.c_str(), nullptr);
178 }
179 
182 }
183 
184 namespace {
185 
186 class TagSaver final : public grpc::internal::CompletionQueueTag {
187  public:
188  explicit TagSaver(void* tag) : tag_(tag) {}
189  ~TagSaver() override {}
190  bool FinalizeResult(void** tag, bool* /*status*/) override {
191  *tag = tag_;
192  delete this;
193  return true;
194  }
195 
196  private:
197  void* tag_;
198 };
199 
200 } // namespace
201 
203  gpr_timespec deadline,
204  grpc::CompletionQueue* cq, void* tag) {
205  TagSaver* tag_saver = new TagSaver(tag);
206  grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
207  cq->cq(), tag_saver);
208 }
209 
211  gpr_timespec deadline) {
213  bool ok = false;
214  void* tag = nullptr;
215  NotifyOnStateChangeImpl(last_observed, deadline, &cq, nullptr);
216  cq.Next(&tag, &ok);
217  GPR_ASSERT(tag == nullptr);
218  return ok;
219 }
220 
221 namespace {
222 class ShutdownCallback : public grpc_completion_queue_functor {
223  public:
224  ShutdownCallback() {
225  functor_run = &ShutdownCallback::Run;
226  // Set inlineable to true since this callback is trivial and thus does not
227  // need to be run from the executor (triggering a thread hop). This should
228  // only be used by internal callbacks like this and not by user application
229  // code.
230  inlineable = true;
231  }
232  // TakeCQ takes ownership of the cq into the shutdown callback
233  // so that the shutdown callback will be responsible for destroying it
234  void TakeCQ(grpc::CompletionQueue* cq) { cq_ = cq; }
235 
236  // The Run function will get invoked by the completion queue library
237  // when the shutdown is actually complete
238  static void Run(grpc_completion_queue_functor* cb, int) {
239  auto* callback = static_cast<ShutdownCallback*>(cb);
240  delete callback->cq_;
241  delete callback;
242  }
243 
244  private:
245  grpc::CompletionQueue* cq_ = nullptr;
246 };
247 } // namespace
248 
250  // TODO(vjpai): Consider using a single global CQ for the default CQ
251  // if there is no explicit per-channel CQ registered
252  CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
253  if (callback_cq != nullptr) {
254  return callback_cq;
255  }
256  // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
257  // once for this channel.
259  callback_cq = callback_cq_.load(std::memory_order_relaxed);
260  if (callback_cq == nullptr) {
262  // gRPC-core provides the backing needed for the preferred CQ type
263 
264  auto* shutdown_callback = new ShutdownCallback;
267  shutdown_callback});
268 
269  // Transfer ownership of the new cq to its own shutdown callback
270  shutdown_callback->TakeCQ(callback_cq);
271  } else {
272  // Otherwise we need to use the alternative CQ variant
274  }
275  callback_cq_.store(callback_cq, std::memory_order_release);
276  }
277  return callback_cq;
278 }
279 
280 } // namespace grpc
grpc::ClientContext::census_context
struct census_context * census_context() const
Returns the census context that has been set, or nullptr if not set.
Definition: grpcpp/impl/codegen/client_context.h:378
grpc_slice_unref
GPRAPI void grpc_slice_unref(grpc_slice s)
Definition: slice_api.cc:32
cq_
grpc_completion_queue * cq_
Definition: channel_connectivity.cc:210
grpc::Channel::interceptor_creators_
std::vector< std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface > > interceptor_creators_
Definition: include/grpcpp/channel.h:121
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
iomgr.h
grpc::Channel::NotifyOnStateChangeImpl
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, grpc::CompletionQueue *cq, void *tag) override
Definition: channel_cc.cc:202
grpc::Channel::PerformOpsOnCall
void PerformOpsOnCall(grpc::internal::CallOpSetInterface *ops, grpc::internal::Call *call) override
Definition: channel_cc.cc:169
grpc::ClientContext::set_call
void set_call(grpc_call *call, const std::shared_ptr< grpc::Channel > &channel)
Definition: client_context.cc:126
log.h
grpc_channel_get_info
GRPCAPI void grpc_channel_get_info(grpc_channel *channel, const grpc_channel_info *channel_info)
Definition: channel.cc:264
grpc::gpr_free
gpr_free(creds_file_name)
grpc::experimental::ChannelResetConnectionBackoff
void ChannelResetConnectionBackoff(Channel *channel)
Definition: channel_cc.cc:111
memset
return memset(p, 0, total)
grpc
Definition: grpcpp/alarm.h:33
grpc_channel_reset_connect_backoff
GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel *channel)
Definition: channel.cc:273
slice.h
grpc_census_call_set_context
GRPCAPI void grpc_census_call_set_context(grpc_call *call, struct census_context *context)
Definition: grpc_context.cc:30
grpc::ClientContext::authority_
grpc::string authority_
Definition: grpcpp/impl/codegen/client_context.h:498
grpc::internal::GrpcLibraryInitializer::summon
int summon()
Definition: grpcpp/impl/grpc_library.h:54
grpc::Channel::callback_cq_
std::atomic< CompletionQueue * > callback_cq_
Definition: include/grpcpp/channel.h:117
grpc::g_gli_initializer
static grpc::internal::GrpcLibraryInitializer g_gli_initializer
Definition: channel_cc.cc:52
grpc::internal::CallOpSetInterface
Definition: call_op_set_interface.h:36
grpc_channel_check_connectivity_state
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(grpc_channel *channel, int try_to_connect)
Definition: channel_connectivity.cc:56
grpc::SliceFromCopiedString
grpc_slice SliceFromCopiedString(const std::string &str)
Definition: include/grpcpp/impl/codegen/slice.h:149
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
slice.h
core_codegen_interface.h
rpc_method.h
grpc::Channel
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: include/grpcpp/channel.h:54
grpc::internal::MutexLock
Definition: include/grpcpp/impl/codegen/sync.h:86
grpc_connectivity_state
grpc_connectivity_state
Definition: include/grpc/impl/codegen/connectivity_state.h:30
call
FilterStackCall * call
Definition: call.cc:750
grpc_channel_register_call
GRPCAPI void * grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host, void *reserved)
Definition: channel.cc:365
grpc_types.h
grpc_iomgr_run_in_background
bool grpc_iomgr_run_in_background()
grpc::g_core_codegen_interface
CoreCodegenInterface * g_core_codegen_interface
Definition: include/grpcpp/impl/codegen/completion_queue.h:98
grpc::ClientContext::authority
std::string authority()
Definition: grpcpp/impl/codegen/client_context.h:482
grpc::ClientContext::raw_deadline
gpr_timespec raw_deadline() const
Return a gpr_timespec representation of the client call's deadline.
Definition: grpcpp/impl/codegen/client_context.h:299
client_interceptor.h
grpc::internal::GrpcLibraryInitializer
Instantiating this class ensures the proper initialization of gRPC.
Definition: grpcpp/impl/grpc_library.h:39
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
grpc::Channel::GetLoadBalancingPolicyName
std::string GetLoadBalancingPolicyName() const
Returns the LB policy name, or the empty string if not yet available.
Definition: channel_cc.cc:97
sync.h
GRPC_CQ_DEFAULT_POLLING
@ GRPC_CQ_DEFAULT_POLLING
Definition: grpc_types.h:743
grpc::Channel::GetState
grpc_connectivity_state GetState(bool try_to_connect) override
Definition: channel_cc.cc:180
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::internal::RpcMethod
Descriptor of an RPC method.
Definition: grpcpp/impl/codegen/rpc_method.h:31
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
grpc::CompletionQueue::ReleaseCallbackAlternativeCQ
static void ReleaseCallbackAlternativeCQ(CompletionQueue *cq)
Definition: completion_queue_cc.cc:202
grpc.h
grpc_call
struct grpc_call grpc_call
Definition: grpc_types.h:70
connectivity_state.h
completion_queue.h
grpc::CoreCodegenInterface::grpc_slice_from_copied_buffer
virtual grpc_slice grpc_slice_from_copied_buffer(const void *buffer, size_t length)=0
completion_queue_tag.h
grpc::CompletionQueue::CallbackAlternativeCQ
static CompletionQueue * CallbackAlternativeCQ()
Definition: completion_queue_cc.cc:196
channel.h
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc::Channel::c_channel_
grpc_channel *const c_channel_
Definition: include/grpcpp/channel.h:108
grpc::Channel::~Channel
~Channel() override
Definition: channel_cc.cc:63
callback
static void callback(void *arg, int status, int timeouts, struct hostent *host)
Definition: acountry.c:224
call_op_set_interface.h
config.h
grpc::internal::Call
Straightforward wrapping of the C call object.
Definition: include/grpcpp/impl/codegen/call.h:37
grpc_completion_queue_attributes
Definition: grpc_types.h:791
grpc::Channel::mu_
grpc::internal::Mutex mu_
Definition: include/grpcpp/channel.h:111
gpr_types.h
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
value
const char * value
Definition: hpack_parser_table.cc:165
grpc_channel_create_registered_call
GRPCAPI grpc_call * grpc_channel_create_registered_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved)
Definition: channel.cc:394
grpc_channel_create_call
GRPCAPI grpc_call * grpc_channel_create_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved)
Definition: channel.cc:311
grpc::Channel::Channel
Channel(const std::string &host, grpc_channel *c_channel, std::vector< std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface >> interceptor_creators)
Definition: channel_cc.cc:53
client_context.h
grpc_channel_info::service_config_json
char ** service_config_json
Definition: grpc_types.h:726
call.h
grpc_library.h
tests.unit._server_ssl_cert_config_test.Call
Call
Definition: _server_ssl_cert_config_test.py:70
GRPC_CQ_CURRENT_VERSION
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:789
grpc_channel_info::lb_policy_name
char ** lb_policy_name
Definition: grpc_types.h:723
grpc::Channel::WaitForStateChangeImpl
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override
Definition: channel_cc.cc:210
alloc.h
call_op_set.h
grpc::CompletionQueue::Shutdown
void Shutdown()
Definition: completion_queue_cc.cc:137
grpc::ClientContext::propagate_from_call_
grpc_call * propagate_from_call_
Definition: grpcpp/impl/codegen/client_context.h:506
ok
bool ok
Definition: async_end2end_test.cc:197
grpc_completion_queue_functor
Definition: grpc_types.h:773
grpc_channel_destroy
GRPCAPI void grpc_channel_destroy(grpc_channel *channel)
Definition: channel.cc:437
grpc::Channel::RegisterMethod
void * RegisterMethod(const char *method) override
Definition: channel_cc.cc:175
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
grpc_channel_watch_connectivity_state
GRPCAPI void grpc_channel_watch_connectivity_state(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
Definition: channel_connectivity.cc:227
tag_
void * tag_
Definition: channel_connectivity.cc:211
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
grpc::Channel::GetServiceConfigJSON
std::string GetServiceConfigJSON() const
Definition: channel_cc.cc:103
grpc::Channel::CreateCallInternal
grpc::internal::Call CreateCallInternal(const grpc::internal::RpcMethod &method, grpc::ClientContext *context, grpc::CompletionQueue *cq, size_t interceptor_pos) override
Definition: channel_cc.cc:117
grpc::Channel::CreateCall
grpc::internal::Call CreateCall(const grpc::internal::RpcMethod &method, grpc::ClientContext *context, grpc::CompletionQueue *cq) override
Definition: channel_cc.cc:163
GRPC_CQ_CALLBACK
@ GRPC_CQ_CALLBACK
Definition: grpc_types.h:766
grpc::CompletionQueue
Definition: include/grpcpp/impl/codegen/completion_queue.h:104
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
gpr_timespec
Definition: gpr_types.h:50
googletest-break-on-failure-unittest.Run
def Run(command)
Definition: bloaty/third_party/googletest/googletest/test/googletest-break-on-failure-unittest.py:76
tests.unit._exit_scenarios.try_to_connect
try_to_connect
Definition: _exit_scenarios.py:189
grpc::internal::CompletionQueueTag
An interface allowing implementors to process and filter event tags.
Definition: grpcpp/impl/codegen/completion_queue_tag.h:28
grpc::ClientContext::propagation_options_
PropagationOptions propagation_options_
Definition: grpcpp/impl/codegen/client_context.h:507
grpc::Channel::CallbackCQ
grpc::CompletionQueue * CallbackCQ() override
Definition: channel_cc.cc:249
method
NSString * method
Definition: ProtoMethod.h:28
grpc_channel_info
Definition: grpc_types.h:720
ops
static grpc_op ops[6]
Definition: test/core/fling/client.cc:39
grpc::ClientContext::set_client_rpc_info
grpc::experimental::ClientRpcInfo * set_client_rpc_info(const char *method, const char *suffix_for_stats, grpc::internal::RpcMethod::RpcType type, grpc::ChannelInterface *channel, const std::vector< std::unique_ptr< grpc::experimental::ClientInterceptorFactoryInterface >> &creators, size_t interceptor_pos)
Definition: grpcpp/impl/codegen/client_context.h:462
cq
static grpc_completion_queue * cq
Definition: test/core/fling/client.cc:37
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc::Channel::host_
const std::string host_
Definition: include/grpcpp/channel.h:107
grpc::PropagationOptions::c_bitmask
uint32_t c_bitmask() const
Definition: grpcpp/impl/codegen/client_context.h:173


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:52