buffer_list.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <grpc/support/log.h>
24 
26 
27 #ifdef GRPC_LINUX_ERRQUEUE
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <time.h>
31 
33 
34 namespace grpc_core {
35 namespace {
37 void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
38  gts->tv_sec = ts->tv_sec;
39  gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
41 }
42 
43 void default_timestamps_callback(void* /*arg*/, Timestamps* /*ts*/,
44  grpc_error_handle /*shudown_err*/) {
45  gpr_log(GPR_DEBUG, "Timestamps callback has not been registered");
46 }
47 
50 void (*timestamps_callback)(void*, Timestamps*,
51  grpc_error_handle shutdown_err) =
52  default_timestamps_callback;
53 
54 /* Used to extract individual opt stats from cmsg, so as to avoid troubles with
55  * unaligned reads */
56 template <typename T>
57 T read_unaligned(const void* ptr) {
58  T val;
59  memcpy(&val, ptr, sizeof(val));
60  return val;
61 }
62 
63 /* Extracts opt stats from the tcp_info struct \a info to \a metrics */
64 void extract_opt_stats_from_tcp_info(ConnectionMetrics* metrics,
65  const tcp_info* info) {
66  if (info == nullptr) {
67  return;
68  }
69  if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) {
70  metrics->recurring_retrans.emplace(info->tcpi_retransmits);
71  metrics->is_delivery_rate_app_limited.emplace(
72  info->tcpi_delivery_rate_app_limited);
73  metrics->congestion_window.emplace(info->tcpi_snd_cwnd);
74  metrics->reordering.emplace(info->tcpi_reordering);
75  metrics->packet_retx.emplace(info->tcpi_total_retrans);
76  metrics->pacing_rate.emplace(info->tcpi_pacing_rate);
77  metrics->data_notsent.emplace(info->tcpi_notsent_bytes);
78  if (info->tcpi_min_rtt != UINT32_MAX) {
79  metrics->min_rtt.emplace(info->tcpi_min_rtt);
80  }
81  metrics->packet_sent.emplace(info->tcpi_data_segs_out);
82  metrics->delivery_rate.emplace(info->tcpi_delivery_rate);
83  metrics->busy_usec.emplace(info->tcpi_busy_time);
84  metrics->rwnd_limited_usec.emplace(info->tcpi_rwnd_limited);
85  metrics->sndbuf_limited_usec.emplace(info->tcpi_sndbuf_limited);
86  }
87  if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) {
88  metrics->data_sent.emplace(info->tcpi_bytes_sent);
89  metrics->data_retx.emplace(info->tcpi_bytes_retrans);
90  metrics->packet_spurious_retx.emplace(info->tcpi_dsack_dups);
91  }
92 }
93 
96 void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
97  const cmsghdr* opt_stats) {
98  if (opt_stats == nullptr) {
99  return;
100  }
101  const auto* data = CMSG_DATA(opt_stats);
102  constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr));
103  const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len;
104  int64_t offset = 0;
105 
106  while (offset < len) {
107  const auto* attr = reinterpret_cast<const nlattr*>(data + offset);
108  const void* val = data + offset + NLA_HDRLEN;
109  switch (attr->nla_type) {
110  case TCP_NLA_BUSY: {
111  metrics->busy_usec.emplace(read_unaligned<uint64_t>(val));
112  break;
113  }
114  case TCP_NLA_RWND_LIMITED: {
115  metrics->rwnd_limited_usec.emplace(read_unaligned<uint64_t>(val));
116  break;
117  }
118  case TCP_NLA_SNDBUF_LIMITED: {
119  metrics->sndbuf_limited_usec.emplace(read_unaligned<uint64_t>(val));
120  break;
121  }
122  case TCP_NLA_PACING_RATE: {
123  metrics->pacing_rate.emplace(read_unaligned<uint64_t>(val));
124  break;
125  }
126  case TCP_NLA_DELIVERY_RATE: {
127  metrics->delivery_rate.emplace(read_unaligned<uint64_t>(val));
128  break;
129  }
130  case TCP_NLA_DELIVERY_RATE_APP_LMT: {
131  metrics->is_delivery_rate_app_limited.emplace(
132  read_unaligned<uint8_t>(val));
133  break;
134  }
135  case TCP_NLA_SND_CWND: {
136  metrics->congestion_window.emplace(read_unaligned<uint32_t>(val));
137  break;
138  }
139  case TCP_NLA_MIN_RTT: {
140  metrics->min_rtt.emplace(read_unaligned<uint32_t>(val));
141  break;
142  }
143  case TCP_NLA_SRTT: {
144  metrics->srtt.emplace(read_unaligned<uint32_t>(val));
145  break;
146  }
147  case TCP_NLA_RECUR_RETRANS: {
148  metrics->recurring_retrans.emplace(read_unaligned<uint8_t>(val));
149  break;
150  }
151  case TCP_NLA_BYTES_SENT: {
152  metrics->data_sent.emplace(read_unaligned<uint64_t>(val));
153  break;
154  }
155  case TCP_NLA_DATA_SEGS_OUT: {
156  metrics->packet_sent.emplace(read_unaligned<uint64_t>(val));
157  break;
158  }
159  case TCP_NLA_TOTAL_RETRANS: {
160  metrics->packet_retx.emplace(read_unaligned<uint64_t>(val));
161  break;
162  }
163  case TCP_NLA_DELIVERED: {
164  metrics->packet_delivered.emplace(read_unaligned<uint32_t>(val));
165  break;
166  }
167  case TCP_NLA_DELIVERED_CE: {
168  metrics->packet_delivered_ce.emplace(read_unaligned<uint32_t>(val));
169  break;
170  }
171  case TCP_NLA_BYTES_RETRANS: {
172  metrics->data_retx.emplace(read_unaligned<uint64_t>(val));
173  break;
174  }
175  case TCP_NLA_DSACK_DUPS: {
176  metrics->packet_spurious_retx.emplace(read_unaligned<uint32_t>(val));
177  break;
178  }
179  case TCP_NLA_REORDERING: {
180  metrics->reordering.emplace(read_unaligned<uint32_t>(val));
181  break;
182  }
183  case TCP_NLA_SND_SSTHRESH: {
184  metrics->snd_ssthresh.emplace(read_unaligned<uint32_t>(val));
185  break;
186  }
187  }
188  offset += NLA_ALIGN(attr->nla_len);
189  }
190 }
191 
192 int get_socket_tcp_info(tcp_info* info, int fd) {
193  memset(info, 0, sizeof(*info));
194  info->length = offsetof(tcp_info, length);
195  return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length));
196 }
197 } /* namespace */
198 
199 void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, int fd,
200  void* arg) {
201  GPR_DEBUG_ASSERT(head != nullptr);
202  TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
203  /* Store the current time as the sendmsg time. */
204  new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
205  new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
206  new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
207  new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
208 
209  if (get_socket_tcp_info(&new_elem->ts_.info, fd) == 0) {
210  extract_opt_stats_from_tcp_info(&new_elem->ts_.sendmsg_time.metrics,
211  &new_elem->ts_.info);
212  }
213  if (*head == nullptr) {
214  *head = new_elem;
215  return;
216  }
217  /* Append at the end. */
218  TracedBuffer* ptr = *head;
219  while (ptr->next_ != nullptr) {
220  ptr = ptr->next_;
221  }
222  ptr->next_ = new_elem;
223 }
224 
225 void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
226  struct sock_extended_err* serr,
227  struct cmsghdr* opt_stats,
228  struct scm_timestamping* tss) {
229  GPR_DEBUG_ASSERT(head != nullptr);
230  TracedBuffer* elem = *head;
231  TracedBuffer* next = nullptr;
232  while (elem != nullptr) {
233  /* The byte number refers to the sequence number of the last byte which this
234  * timestamp relates to. */
235  if (serr->ee_data >= elem->seq_no_) {
236  switch (serr->ee_info) {
237  case SCM_TSTAMP_SCHED:
238  fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time),
239  &(tss->ts[0]));
240  extract_opt_stats_from_cmsg(&(elem->ts_.scheduled_time.metrics),
241  opt_stats);
242  elem = elem->next_;
243  break;
244  case SCM_TSTAMP_SND:
245  fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
246  extract_opt_stats_from_cmsg(&(elem->ts_.sent_time.metrics),
247  opt_stats);
248  elem = elem->next_;
249  break;
250  case SCM_TSTAMP_ACK:
251  fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
252  extract_opt_stats_from_cmsg(&(elem->ts_.acked_time.metrics),
253  opt_stats);
254  /* Got all timestamps. Do the callback and free this TracedBuffer.
255  * The thing below can be passed by value if we don't want the
256  * restriction on the lifetime. */
257  timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
258  next = elem->next_;
259  delete static_cast<TracedBuffer*>(elem);
260  *head = elem = next;
261  break;
262  default:
263  abort();
264  }
265  } else {
266  break;
267  }
268  }
269 }
270 
271 void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining,
272  grpc_error_handle shutdown_err) {
273  GPR_DEBUG_ASSERT(head != nullptr);
274  TracedBuffer* elem = *head;
275  while (elem != nullptr) {
276  timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
277  auto* next = elem->next_;
278  delete elem;
279  elem = next;
280  }
281  *head = nullptr;
282  if (remaining != nullptr) {
283  timestamps_callback(remaining, nullptr, shutdown_err);
284  }
285  GRPC_ERROR_UNREF(shutdown_err);
286 }
287 
289  void (*fn)(void*, Timestamps*, grpc_error_handle error)) {
290  timestamps_callback = fn;
291 }
292 } /* namespace grpc_core */
293 
294 #else /* GRPC_LINUX_ERRQUEUE */
295 
296 namespace grpc_core {
298  void (*fn)(void*, Timestamps*, grpc_error_handle error)) {
299  // Cast value of fn to void to avoid unused parameter warning.
300  // Can't comment out the name because some compilers and formatters don't
301  // like the sequence */* , which would arise from */*fn*/.
302  (void)fn;
303  gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
304 }
305 } /* namespace grpc_core */
306 
307 #endif /* GRPC_LINUX_ERRQUEUE */
gpr_timespec::tv_nsec
int32_t tv_nsec
Definition: gpr_types.h:52
ptr
char * ptr
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:45
gpr_timespec::tv_sec
int64_t tv_sec
Definition: gpr_types.h:51
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
memset
return memset(p, 0, total)
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
buffer_list.h
grpc_core
Definition: call_metric_recorder.h:31
string.h
elem
Timer elem
Definition: event_engine/iomgr_event_engine/timer_heap_test.cc:109
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_core::grpc_tcp_set_write_timestamps_callback
void grpc_tcp_set_write_timestamps_callback(void(*fn)(void *, Timestamps *, grpc_error_handle error))
Definition: buffer_list.cc:297
UINT32_MAX
#define UINT32_MAX
Definition: stdint-msvc2008.h:142
grpc_core::Timestamps
Definition: buffer_list.h:90
T
#define T(upbtypeconst, upbtype, ctype, default_value)
memory.h
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
memcpy
memcpy(mem, inblock.get(), min(CONTAINING_RECORD(inblock.get(), MEMBLOCK, data) ->size, size))
int64_t
signed __int64 int64_t
Definition: stdint-msvc2008.h:89
generate-asm-lcov.fn
fn
Definition: generate-asm-lcov.py:146
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
arg
Definition: cmdline.cc:40
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
gpr_now
GPRAPI gpr_timespec gpr_now(gpr_clock_type clock)
gpr_timespec::clock_type
gpr_clock_type clock_type
Definition: gpr_types.h:55
gpr_inf_past
GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type)
Definition: src/core/lib/gpr/time.cc:63
grpc_core::TracedBuffer::Shutdown
static void Shutdown(TracedBuffer **, void *, grpc_error_handle shutdown_err)
Definition: buffer_list.h:149
attr
OPENSSL_EXPORT X509_ATTRIBUTE * attr
Definition: x509.h:1666
port.h
next
AllocList * next[kMaxLevel]
Definition: abseil-cpp/absl/base/internal/low_level_alloc.cc:100
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
gpr_timespec
Definition: gpr_types.h:50
grpc_error
Definition: error_internal.h:42
length
std::size_t length
Definition: abseil-cpp/absl/time/internal/test_util.cc:57
GPR_CLOCK_REALTIME
@ GPR_CLOCK_REALTIME
Definition: gpr_types.h:39
int32_t
signed int int32_t
Definition: stdint-msvc2008.h:77
offset
voidpf uLong offset
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:142
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:41