server_filter.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <utility>
24 
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/time/clock.h"
28 #include "absl/time/time.h"
29 #include "absl/types/optional.h"
30 #include "opencensus/stats/stats.h"
31 #include "opencensus/tags/tag_key.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/log.h>
35 
42 
43 namespace grpc {
44 
46 
47 namespace {
48 
49 // server metadata elements
50 struct ServerMetadataElements {
52  grpc_core::Slice tracing_slice;
53  grpc_core::Slice census_proto;
54 };
55 
56 void FilterInitialMetadata(grpc_metadata_batch* b,
57  ServerMetadataElements* sml) {
58  const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
59  if (path != nullptr) {
60  sml->path = path->Ref();
61  }
62  auto grpc_trace_bin = b->Take(grpc_core::GrpcTraceBinMetadata());
63  if (grpc_trace_bin.has_value()) {
64  sml->tracing_slice = std::move(*grpc_trace_bin);
65  }
66  auto grpc_tags_bin = b->Take(grpc_core::GrpcTagsBinMetadata());
67  if (grpc_tags_bin.has_value()) {
68  sml->census_proto = std::move(*grpc_tags_bin);
69  }
70 }
71 
72 } // namespace
73 
76  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
77  CensusServerCallData* calld =
78  reinterpret_cast<CensusServerCallData*>(elem->call_data);
79  CensusChannelData* channeld =
80  reinterpret_cast<CensusChannelData*>(elem->channel_data);
81  GPR_ASSERT(calld != nullptr);
82  GPR_ASSERT(channeld != nullptr);
83  // Stream messages are no longer valid after receiving trailing metadata.
84  if (calld->recv_message_->has_value()) {
85  ++calld->recv_message_count_;
86  }
89 }
90 
92  void* user_data, grpc_error_handle error) {
93  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
94  CensusServerCallData* calld =
95  reinterpret_cast<CensusServerCallData*>(elem->call_data);
96  GPR_ASSERT(calld != nullptr);
98  grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
99  GPR_ASSERT(initial_metadata != nullptr);
100  ServerMetadataElements sml;
101  FilterInitialMetadata(initial_metadata, &sml);
102  calld->path_ = std::move(sml.path);
103  calld->method_ = GetMethod(calld->path_);
104  calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
105  GenerateServerContext(sml.tracing_slice.as_string_view(),
106  calld->qualified_method_, &calld->context_);
108  calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
109  }
113 }
114 
117  if (op->recv_initial_metadata() != nullptr) {
118  // substitute our callback for the op callback
119  recv_initial_metadata_ = op->recv_initial_metadata()->batch();
120  initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
121  op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
122  }
123  if (op->send_message() != nullptr) {
125  }
126  if (op->recv_message() != nullptr) {
127  recv_message_ = op->op()->payload->recv_message.recv_message;
129  op->op()->payload->recv_message.recv_message_ready;
130  op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
131  }
132  // We need to record the time when the trailing metadata was sent to mark the
133  // completeness of the request.
134  if (op->send_trailing_metadata() != nullptr) {
138  if (len > 0) {
139  op->send_trailing_metadata()->batch()->Set(
142  }
143  }
144  // Call next op.
146 }
147 
151  gc_ =
155  grpc_schedule_on_exec_ctx);
157  grpc_schedule_on_exec_ctx);
159  return GRPC_ERROR_NONE;
160 }
161 
163  const grpc_call_final_info* final_info,
164  grpc_closure* /*then_call_closure*/) {
165  const uint64_t request_size = GetOutgoingDataSize(final_info);
166  const uint64_t response_size = GetIncomingDataSize(final_info);
167  double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
169  ::opencensus::stats::Record(
170  {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
171  {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
172  {RpcServerServerLatency(), elapsed_time_ms},
177  context_.EndSpan();
178 }
179 
180 } // namespace grpc
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc::RpcServerServerLatency
MeasureDouble RpcServerServerLatency()
Definition: measures.cc:133
census_context
struct census_context census_context
Definition: census.h:34
grpc::GetOutgoingDataSize
uint64_t GetOutgoingDataSize(const grpc_call_final_info *final_info)
Definition: context.cc:107
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
grpc::CensusServerCallData::OnDoneRecvInitialMetadataCb
static void OnDoneRecvInitialMetadataCb(void *user_data, grpc_error_handle error)
Definition: server_filter.cc:91
grpc
Definition: grpcpp/alarm.h:33
grpc_census_call_set_context
GRPCAPI void grpc_census_call_set_context(grpc_call *call, struct census_context *context)
Definition: grpc_context.cc:30
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
grpc_core::HttpPathMetadata
Definition: metadata_batch.h:262
channel_filter.h
grpc::CensusServerCallData::recv_message_count_
uint64_t recv_message_count_
Definition: server_filter.h:105
grpc::CensusServerCallData::OnDoneRecvMessageCb
static void OnDoneRecvMessageCb(void *user_data, grpc_error_handle error)
Definition: server_filter.cc:74
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc::CensusServerCallData::kMaxServerStatsLen
static constexpr uint32_t kMaxServerStatsLen
Definition: server_filter.h:56
grpc::CensusServerCallData::Init
grpc_error_handle Init(grpc_call_element *elem, const grpc_call_element_args *args) override
Initializes the call data.
Definition: server_filter.cc:148
grpc::CensusServerCallData::initial_on_done_recv_message_
grpc_closure * initial_on_done_recv_message_
Definition: server_filter.h:100
grpc_call_final_info::final_status
grpc_status_code final_status
Definition: channel_stack.h:97
check_documentation.path
path
Definition: check_documentation.py:57
grpc::CensusContext::EndSpan
void EndSpan()
Definition: cpp/ext/filters/census/context.h:72
grpc_call_stack_element
grpc_call_element * grpc_call_stack_element(grpc_call_stack *call_stack, size_t index)
Definition: channel_stack.cc:100
grpc::CensusServerCallData::on_done_recv_initial_metadata_
grpc_closure on_done_recv_initial_metadata_
Definition: server_filter.h:98
grpc::CensusServerCallData::start_time_
absl::Time start_time_
Definition: server_filter.h:102
grpc_call_element
Definition: channel_stack.h:194
grpc::CensusChannelData
Definition: ext/filters/census/channel_filter.h:31
grpc_auth_context_release
GRPCAPI void grpc_auth_context_release(grpc_auth_context *context)
Definition: security_context.cc:94
grpc::ServerStatusTagKey
::opencensus::tags::TagKey ServerStatusTagKey()
Definition: grpc_plugin.cc:98
grpc_core::GrpcServerStatsBinMetadata
Definition: metadata_batch.h:238
grpc::ServerStatsSerialize
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char *buf, size_t buf_size)
Definition: context.cc:92
grpc::CensusServerCallData::method_
absl::string_view method_
Definition: server_filter.h:88
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
grpc::CensusServerCallData::StartTransportStreamOpBatch
void StartTransportStreamOpBatch(grpc_call_element *elem, TransportStreamOpBatch *op) override
Starts a new stream operation.
Definition: server_filter.cc:115
grpc::TransportStreamOpBatch
A C++ wrapper for the grpc_transport_stream_op_batch struct.
Definition: common/channel_filter.h:99
grpc::CensusServerCallData::recv_message_
absl::optional< grpc_core::SliceBuffer > * recv_message_
Definition: server_filter.h:104
grpc::RpcServerSentMessagesPerRpc
MeasureInt64 RpcServerSentMessagesPerRpc()
Definition: measures.cc:142
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc::CensusServerCallData::qualified_method_
std::string qualified_method_
Definition: server_filter.h:89
grpc_core::GrpcTagsBinMetadata
Definition: metadata_batch.h:250
grpc::GenerateServerContext
void GenerateServerContext(absl::string_view tracing, absl::string_view method, CensusContext *context)
Definition: context.cc:38
grpc_core::slice_detail::CopyConstructors< Slice >::FromCopiedBuffer
static Slice FromCopiedBuffer(const char *p, size_t len)
Definition: src/core/lib/slice/slice.h:182
absl::ToDoubleMilliseconds
double ToDoubleMilliseconds(Duration d)
Definition: abseil-cpp/absl/time/duration.cc:593
grpc::CensusServerCallData::context_
CensusContext context_
Definition: server_filter.h:86
grpc.h
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
grpc::CensusServerCallData::stats_buf_
char stats_buf_[kMaxServerStatsLen]
Definition: server_filter.h:109
grpc::StatusCodeToString
absl::string_view StatusCodeToString(grpc_status_code code)
Definition: context.cc:119
grpc_call_auth_context
GRPCAPI grpc_auth_context * grpc_call_auth_context(grpc_call *call)
Definition: security_context.cc:69
grpc_call_next_op
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
Definition: channel_stack.cc:251
grpc::GetMethod
absl::string_view GetMethod(const grpc_core::Slice &path)
Definition: cpp/ext/filters/census/context.h:126
grpc_call_element_args
Definition: channel_stack.h:80
b
uint64_t b
Definition: abseil-cpp/absl/container/internal/layout_test.cc:53
grpc_op::op
grpc_op_type op
Definition: grpc_types.h:642
measures.h
grpc::CensusServerCallData::gc_
grpc_call * gc_
Definition: server_filter.h:92
grpc_plugin.h
absl::Now
ABSL_NAMESPACE_BEGIN Time Now()
Definition: abseil-cpp/absl/time/clock.cc:39
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
debug_location.h
grpc::CensusServerCallData::sent_message_count_
uint64_t sent_message_count_
Definition: server_filter.h:106
grpc::RpcServerReceivedBytesPerRpc
MeasureDouble RpcServerReceivedBytesPerRpc()
Definition: measures.cc:126
grpc::CensusServerCallData
Definition: server_filter.h:53
grpc::ServerMethodTagKey
::opencensus::tags::TagKey ServerMethodTagKey()
Definition: grpc_plugin.cc:92
grpc::CensusServerCallData::initial_on_done_recv_initial_metadata_
grpc_closure * initial_on_done_recv_initial_metadata_
Definition: server_filter.h:97
grpc::CensusServerCallData::elapsed_time_
absl::Duration elapsed_time_
Definition: server_filter.h:103
grpc::CensusServerCallData::auth_context_
grpc_auth_context * auth_context_
Definition: server_filter.h:94
grpc::GetIncomingDataSize
uint64_t GetIncomingDataSize(const grpc_call_final_info *final_info)
Definition: context.cc:103
grpc::RpcServerSentBytesPerRpc
MeasureDouble RpcServerSentBytesPerRpc()
Definition: measures.cc:119
transport.h
grpc::CensusServerCallData::path_
grpc_core::Slice path_
Definition: server_filter.h:90
grpc::RpcServerReceivedMessagesPerRpc
MeasureInt64 RpcServerReceivedMessagesPerRpc()
Definition: measures.cc:149
grpc::CensusServerCallData::on_done_recv_message_
grpc_closure on_done_recv_message_
Definition: server_filter.h:101
grpc_call_final_info
Definition: channel_stack.h:95
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_error
Definition: error_internal.h:42
server_filter.h
grpc::CensusServerCallData::recv_initial_metadata_
grpc_metadata_batch * recv_initial_metadata_
Definition: server_filter.h:96
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_closure
Definition: closure.h:56
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
grpc_call_from_top_element
grpc_call * grpc_call_from_top_element(grpc_call_element *surface_element)
Definition: call.cc:1778
grpc_core::GrpcTraceBinMetadata
Definition: metadata_batch.h:244
grpc::CensusServerCallData::Destroy
void Destroy(grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *then_call_closure) override
Definition: server_filter.cc:162
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
call.h
absl::ToInt64Nanoseconds
int64_t ToInt64Nanoseconds(Duration d)
Definition: abseil-cpp/absl/time/duration.cc:544
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:11