client_filter.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include <algorithm>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/status/status.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/time/clock.h"
35 #include "absl/time/time.h"
36 #include "absl/types/optional.h"
37 #include "opencensus/stats/stats.h"
38 #include "opencensus/tags/tag_key.h"
39 #include "opencensus/tags/tag_map.h"
40 #include "opencensus/trace/span.h"
41 #include "opencensus/trace/span_context.h"
42 #include "opencensus/trace/status_code.h"
43 
45 #include <grpc/slice.h>
46 #include <grpc/support/log.h>
47 #include <grpcpp/support/config.h>
48 
60 
61 namespace grpc {
62 
63 constexpr uint32_t
65 constexpr uint32_t
67 
69  grpc_call_element* /* elem */, const grpc_call_element_args* args) {
70  tracer_ = args->arena->New<OpenCensusCallTracer>(args);
71  GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
72  args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
73  args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
74  (static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
75  };
76  return GRPC_ERROR_NONE;
77 }
78 
81  // Note that we are generating the overall call context here instead of in
82  // the constructor of `OpenCensusCallTracer` due to the semantics of
83  // `grpc_census_call_set_context` which allows the application to set the
84  // census context for a call anytime before the first call to
85  // `grpc_call_start_batch`.
86  if (op->op()->send_initial_metadata) {
88  }
90 }
91 
92 //
93 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
94 //
95 
96 namespace {
97 
98 CensusContext CreateCensusContextForCallAttempt(
99  absl::string_view method, const CensusContext& parent_context) {
100  GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
101  return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
102  parent_context.tags());
103 }
104 
105 } // namespace
106 
108  OpenCensusCallTracer* parent, uint64_t attempt_num,
109  bool is_transparent_retry, bool arena_allocated)
110  : parent_(parent),
111  arena_allocated_(arena_allocated),
112  context_(CreateCensusContextForCallAttempt(parent_->method_,
113  parent_->context_)),
114  start_time_(absl::Now()) {
115  context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
116  context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
117 }
118 
121  uint32_t /*flags*/) {
122  char tracing_buf[kMaxTraceContextLen];
123  size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
124  kMaxTraceContextLen);
125  if (tracing_len > 0) {
128  grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
129  }
131  // TODO(unknown): Add in tagging serialization.
132  size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
133  if (encoded_tags_len > 0) {
136  }
137 }
138 
140  const grpc_core::SliceBuffer& /*send_message*/) {
141  ++sent_message_count_;
142 }
143 
145  const grpc_core::SliceBuffer& /*recv_message*/) {
146  ++recv_message_count_;
147 }
148 
149 namespace {
150 
151 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
152  absl::optional<grpc_core::Slice> grpc_server_stats_bin =
154  if (grpc_server_stats_bin.has_value()) {
156  reinterpret_cast<const char*>(grpc_server_stats_bin->data()),
157  grpc_server_stats_bin->size(), elapsed_time);
158  }
159 }
160 
161 } // namespace
162 
165  absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
166  const grpc_transport_stream_stats* transport_stream_stats) {
167  status_code_ = status.code();
168  if (recv_trailing_metadata == nullptr || transport_stream_stats == nullptr) {
169  return;
170  }
171  uint64_t elapsed_time = 0;
172  FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time);
173  std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
174  context_.tags().tags();
175  tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
176  std::string final_status = absl::StatusCodeToString(status_code_);
177  tags.emplace_back(ClientStatusTagKey(), final_status);
178  ::opencensus::stats::Record(
180  static_cast<double>(transport_stream_stats->outgoing.data_bytes)},
182  static_cast<double>(transport_stream_stats->incoming.data_bytes)},
184  ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))}},
185  tags);
186 }
187 
189  grpc_error_handle cancel_error) {
190  status_code_ = absl::StatusCode::kCancelled;
191  GRPC_ERROR_UNREF(cancel_error);
192 }
193 
195  const gpr_timespec& /*latency*/) {
196  double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
197  std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
198  context_.tags().tags();
199  tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
200  tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
201  ::opencensus::stats::Record(
202  {{RpcClientRoundtripLatency(), latency_ms},
203  {RpcClientSentMessagesPerRpc(), sent_message_count_},
204  {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
205  tags);
206  if (status_code_ != absl::StatusCode::kOk) {
207  context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
208  StatusCodeToString(status_code_));
209  }
210  context_.EndSpan();
211  grpc_core::MutexLock lock(&parent_->mu_);
212  if (--parent_->num_active_rpcs_ == 0) {
213  parent_->time_at_last_attempt_end_ = absl::Now();
214  }
215  if (arena_allocated_) {
217  } else {
218  delete this;
219  }
220 }
221 
222 //
223 // OpenCensusCallTracer
224 //
225 
230  arena_(args->arena) {}
231 
233  std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
234  context_.tags().tags();
235  tags.emplace_back(ClientMethodTagKey(), std::string(method_));
236  ::opencensus::stats::Record(
237  {{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
238  {RpcClientTransparentRetriesPerCall(), transparent_retries_},
240  tags);
241 }
242 
244  auto* parent_context = reinterpret_cast<CensusContext*>(
247  (parent_context == nullptr) ? nullptr : parent_context);
248 }
249 
251 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
252  // We allocate the first attempt on the arena and all subsequent attempts on
253  // the heap, so that in the common case we don't require a heap allocation,
254  // nor do we unnecessarily grow the arena.
255  bool is_first_attempt = true;
256  uint64_t attempt_num;
257  {
258  grpc_core::MutexLock lock(&mu_);
259  if (transparent_retries_ != 0 || retries_ != 0) {
260  is_first_attempt = false;
261  if (num_active_rpcs_ == 0) {
262  retry_delay_ += absl::Now() - time_at_last_attempt_end_;
263  }
264  }
265  attempt_num = retries_;
266  if (is_transparent_retry) {
267  ++transparent_retries_;
268  } else {
269  ++retries_;
270  }
271  ++num_active_rpcs_;
272  }
273  if (is_first_attempt) {
275  this, attempt_num, is_transparent_retry, true /* arena_allocated */);
276  }
277  return new OpenCensusCallAttemptTracer(
278  this, attempt_num, is_transparent_retry, false /* arena_allocated */);
279 }
280 
281 } // namespace grpc
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer
OpenCensusCallAttemptTracer(OpenCensusCallTracer *parent, uint64_t attempt_num, bool is_transparent_retry, bool arena_allocated)
Definition: client_filter.cc:107
slice.h
grpc::status
auto status
Definition: cpp/client/credentials_test.cc:200
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
metadata_batch.h
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage
void RecordReceivedMessage(const grpc_core::SliceBuffer &) override
Definition: client_filter.cc:144
grpc_slice_ref_internal
const grpc_slice & grpc_slice_ref_internal(const grpc_slice &slice)
Definition: slice_refcount.h:32
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc::RpcClientRetriesPerCall
MeasureInt64 RpcClientRetriesPerCall()
Definition: measures.cc:93
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc
Definition: grpcpp/alarm.h:33
slice.h
GRPC_CONTEXT_CALL_TRACER
@ GRPC_CONTEXT_CALL_TRACER
Value is a CallTracer object.
Definition: core/lib/channel/context.h:40
grpc::RpcClientReceivedBytesPerRpc
MeasureDouble RpcClientReceivedBytesPerRpc()
Definition: measures.cc:53
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
grpc_core::MutexLock
Definition: src/core/lib/gprpp/sync.h:88
absl::StatusCodeToString
ABSL_NAMESPACE_BEGIN std::string StatusCodeToString(StatusCode code)
Definition: third_party/abseil-cpp/absl/status/status.cc:33
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen
static constexpr uint32_t kMaxTagsLen
Definition: open_census_call_tracer.h:79
absl::Nanoseconds
constexpr Duration Nanoseconds(T n)
Definition: third_party/abseil-cpp/absl/time/time.h:407
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen
static constexpr uint32_t kMaxTraceContextLen
Definition: open_census_call_tracer.h:77
grpc::OpenCensusCallTracer::OpenCensusCallTracer
OpenCensusCallTracer(const grpc_call_element_args *args)
Definition: client_filter.cc:226
grpc_core::Arena::New
T * New(Args &&... args)
Definition: src/core/lib/resource_quota/arena.h:77
grpc_transport_stream_stats::outgoing
grpc_transport_one_way_stats outgoing
Definition: transport.h:251
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
arena.h
grpc::ClientMethodTagKey
::opencensus::tags::TagKey ClientMethodTagKey()
Definition: grpc_plugin.cc:80
grpc::CensusClientCallData::tracer_
OpenCensusCallTracer * tracer_
Definition: client_filter.h:44
check_documentation.path
path
Definition: check_documentation.py:57
grpc::CensusContext::EndSpan
void EndSpan()
Definition: cpp/ext/filters/census/context.h:72
grpc_call_element
Definition: channel_stack.h:194
grpc::CensusContext
Definition: cpp/ext/filters/census/context.h:44
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendInitialMetadata
void RecordSendInitialMetadata(grpc_metadata_batch *send_initial_metadata, uint32_t) override
Definition: client_filter.cc:120
grpc::RpcClientRetryDelayPerCall
MeasureDouble RpcClientRetryDelayPerCall()
Definition: measures.cc:109
grpc_core::GrpcServerStatsBinMetadata
Definition: metadata_batch.h:238
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedTrailingMetadata
void RecordReceivedTrailingMetadata(absl::Status status, grpc_metadata_batch *recv_trailing_metadata, const grpc_transport_stream_stats *transport_stream_stats) override
Definition: client_filter.cc:164
grpc::CensusContext::Context
::opencensus::trace::SpanContext Context() const
Definition: cpp/ext/filters/census/context.h:71
tags
bool tags[kAvailableTags]
Definition: inproc_callback_test.cc:114
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage
void RecordSendMessage(const grpc_core::SliceBuffer &) override
Definition: client_filter.cc:139
grpc::TransportStreamOpBatch
A C++ wrapper for the grpc_transport_stream_op_batch struct.
Definition: common/channel_filter.h:99
grpc::OpenCensusCallTracer::GenerateContext
void GenerateContext()
Definition: client_filter.cc:243
context.h
start_time_
Timestamp start_time_
Definition: outlier_detection.cc:379
grpc::RpcClientReceivedMessagesPerRpc
MeasureInt64 RpcClientReceivedMessagesPerRpc()
Definition: measures.cc:85
grpc::OpenCensusCallTracer::method_
absl::string_view method_
Definition: open_census_call_tracer.h:103
grpc::ServerStatsDeserialize
size_t ServerStatsDeserialize(const char *buf, size_t buf_size, uint64_t *server_elapsed_time)
Definition: context.cc:97
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
context_
ScopedContext * context_
Definition: filter_fuzzer.cc:559
grpc_core::GrpcTagsBinMetadata
Definition: metadata_batch.h:250
grpc::CensusClientCallData::Init
grpc_error_handle Init(grpc_call_element *, const grpc_call_element_args *args) override
Initializes the call data.
Definition: client_filter.cc:68
grpc::RpcClientRoundtripLatency
MeasureDouble RpcClientRoundtripLatency()
Definition: measures.cc:60
grpc_core::slice_detail::CopyConstructors< Slice >::FromCopiedBuffer
static Slice FromCopiedBuffer(const char *p, size_t len)
Definition: src/core/lib/slice/slice.h:182
grpc::RpcClientServerLatency
MeasureDouble RpcClientServerLatency()
Definition: measures.cc:69
absl::ToDoubleMilliseconds
double ToDoubleMilliseconds(Duration d)
Definition: abseil-cpp/absl/time/duration.cc:593
grpc::TraceContextSerialize
size_t TraceContextSerialize(const ::opencensus::trace::SpanContext &context, char *tracing_buf, size_t tracing_buf_size)
Definition: context.cc:76
grpc::OpenCensusCallTracer::~OpenCensusCallTracer
~OpenCensusCallTracer() override
Definition: client_filter.cc:232
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::OpenCensusCallTracer::StartNewAttempt
OpenCensusCallAttemptTracer * StartNewAttempt(bool is_transparent_retry) override
Definition: client_filter.cc:251
absl::optional< grpc_core::Slice >
grpc_core::slice_detail::BaseSlice::data
const uint8_t * data() const
Definition: src/core/lib/slice/slice.h:99
grpc_empty_slice
GPRAPI grpc_slice grpc_empty_slice(void)
Definition: slice/slice.cc:42
grpc_slice
Definition: include/grpc/impl/codegen/slice.h:65
grpc::GenerateClientContext
void GenerateClientContext(absl::string_view method, CensusContext *ctxt, CensusContext *parent_ctxt)
Definition: context.cc:52
grpc_core::slice_detail::BaseSlice::size
size_t size() const
Definition: src/core/lib/slice/slice.h:102
grpc::StatusCodeToString
absl::string_view StatusCodeToString(grpc_status_code code)
Definition: context.cc:119
grpc_transport_one_way_stats::data_bytes
uint64_t data_bytes
Definition: transport.h:245
grpc_call_next_op
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
Definition: channel_stack.cc:251
grpc::GetMethod
absl::string_view GetMethod(const grpc_core::Slice &path)
Definition: cpp/ext/filters/census/context.h:126
grpc_call_element_args
Definition: channel_stack.h:80
grpc::StatusCode
StatusCode
Definition: grpcpp/impl/codegen/status_code_enum.h:26
b
uint64_t b
Definition: abseil-cpp/absl/container/internal/layout_test.cc:53
GRPC_CONTEXT_TRACING
@ GRPC_CONTEXT_TRACING
Value is a census_context.
Definition: core/lib/channel/context.h:37
config.h
grpc::RpcClientSentBytesPerRpc
MeasureDouble RpcClientSentBytesPerRpc()
Definition: measures.cc:46
stdint.h
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
gpr_types.h
measures.h
grpc_plugin.h
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd
void RecordEnd(const gpr_timespec &) override
Definition: client_filter.cc:194
grpc_transport_stream_stats::incoming
grpc_transport_one_way_stats incoming
Definition: transport.h:250
grpc_call_context_element::value
void * value
Definition: core/lib/channel/context.h:52
grpc::CensusClientCallData::StartTransportStreamOpBatch
void StartTransportStreamOpBatch(grpc_call_element *elem, TransportStreamOpBatch *op) override
Starts a new stream operation.
Definition: client_filter.cc:79
grpc::OpenCensusCallTracer
Definition: open_census_call_tracer.h:48
grpc::RpcClientSentMessagesPerRpc
MeasureInt64 RpcClientSentMessagesPerRpc()
Definition: measures.cc:78
absl::StatusCode::kOk
@ kOk
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
grpc::OpenCensusCallTracer::context_
CensusContext context_
Definition: open_census_call_tracer.h:104
grpc_core::SliceBuffer
Definition: src/core/lib/slice/slice_buffer.h:44
absl::StatusCode::kCancelled
@ kCancelled
client_filter.h
grpc_transport_stream_stats
Definition: transport.h:249
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
grpc::ClientStatusTagKey
::opencensus::tags::TagKey ClientStatusTagKey()
Definition: grpc_plugin.cc:86
grpc::RpcClientTransparentRetriesPerCall
MeasureInt64 RpcClientTransparentRetriesPerCall()
Definition: measures.cc:102
slice_refcount.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::context_
CensusContext context_
Definition: open_census_call_tracer.h:82
transport.h
grpc::OpenCensusCallTracer::call_context_
const grpc_call_context_element * call_context_
Definition: open_census_call_tracer.h:100
context
grpc::ClientContext context
Definition: istio_echo_server_lib.cc:61
absl
Definition: abseil-cpp/absl/algorithm/algorithm.h:31
grpc::CensusContext::tags
const ::opencensus::tags::TagMap & tags() const
Definition: cpp/ext/filters/census/context.h:69
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer
Definition: open_census_call_tracer.h:50
grpc::StatsContextSerialize
size_t StatsContextSerialize(size_t, grpc_slice *)
Definition: context.cc:87
gpr_timespec
Definition: gpr_types.h:50
grpc::CensusContext::Span
const ::opencensus::trace::Span & Span() const
Definition: cpp/ext/filters/census/context.h:68
grpc_error
Definition: error_internal.h:42
parent_
RefCountedPtr< GrpcLb > parent_
Definition: grpclb.cc:438
grpc::CensusContext::AddSpanAttribute
void AddSpanAttribute(absl::string_view key, opencensus::trace::AttributeValueRef attribute)
Definition: cpp/ext/filters/census/context.h:63
method
NSString * method
Definition: ProtoMethod.h:28
grpc_metadata_batch
Definition: metadata_batch.h:1259
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
slice_buffer.h
grpc_core::GrpcTraceBinMetadata
Definition: metadata_batch.h:244
send_initial_metadata
static void send_initial_metadata(void)
Definition: test/core/fling/server.cc:121
grpc::OpenCensusCallTracer::mu_
grpc_core::Mutex mu_
Definition: open_census_call_tracer.h:106
sync.h
grpc::OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel
void RecordCancel(grpc_error_handle cancel_error) override
Definition: client_filter.cc:188
context.h
grpc::OpenCensusCallTracer::path_
grpc_core::Slice path_
Definition: open_census_call_tracer.h:102
grpc::OpenCensusCallTracer::arena_
grpc_core::Arena * arena_
Definition: open_census_call_tracer.h:105
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:57:55