tcp_posix.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
24 
25 #ifdef GRPC_POSIX_SOCKET_TCP
26 
27 #include <errno.h>
28 #include <limits.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <stdbool.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38 
39 #include <algorithm>
40 #include <unordered_map>
41 
42 #include <grpc/slice.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/log.h>
46 #include <grpc/support/sync.h>
47 #include <grpc/support/time.h>
48 
67 
68 #ifndef SOL_TCP
69 #define SOL_TCP IPPROTO_TCP
70 #endif
71 
72 #ifndef TCP_INQ
73 #define TCP_INQ 36
74 #define TCP_CM_INQ TCP_INQ
75 #endif
76 
77 #ifdef GRPC_HAVE_MSG_NOSIGNAL
78 #define SENDMSG_FLAGS MSG_NOSIGNAL
79 #else
80 #define SENDMSG_FLAGS 0
81 #endif
82 
83 // TCP zero copy sendmsg flag.
84 // NB: We define this here as a fallback in case we're using an older set of
85 // library headers that has not defined MSG_ZEROCOPY. Since this constant is
86 // part of the kernel, we are guaranteed it will never change/disagree so
87 // defining it here is safe.
88 #ifndef MSG_ZEROCOPY
89 #define MSG_ZEROCOPY 0x4000000
90 #endif
91 
92 #ifdef GRPC_MSG_IOVLEN_TYPE
93 typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
94 #else
95 typedef size_t msg_iovlen_type;
96 #endif
97 
99 
100 GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning);
101 
102 namespace grpc_core {
103 
104 class TcpZerocopySendRecord {
105  public:
106  TcpZerocopySendRecord() { grpc_slice_buffer_init(&buf_); }
107 
108  ~TcpZerocopySendRecord() {
109  AssertEmpty();
111  }
112 
113  // Given the slices that we wish to send, and the current offset into the
114  // slice buffer (indicating which have already been sent), populate an iovec
115  // array that will be used for a zerocopy enabled sendmsg().
116  msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx,
117  size_t* unwind_byte_idx, size_t* sending_length,
118  iovec* iov);
119 
120  // A sendmsg() may not be able to send the bytes that we requested at this
121  // time, returning EAGAIN (possibly due to backpressure). In this case,
122  // unwind the offset into the slice buffer so we retry sending these bytes.
123  void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) {
124  out_offset_.byte_idx = unwind_byte_idx;
125  out_offset_.slice_idx = unwind_slice_idx;
126  }
127 
128  // Update the offset into the slice buffer based on how much we wanted to sent
129  // vs. what sendmsg() actually sent (which may be lower, possibly due to
130  // backpressure).
131  void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent);
132 
133  // Indicates whether all underlying data has been sent or not.
134  bool AllSlicesSent() { return out_offset_.slice_idx == buf_.count; }
135 
136  // Reset this structure for a new tcp_write() with zerocopy.
137  void PrepareForSends(grpc_slice_buffer* slices_to_send) {
138  AssertEmpty();
139  out_offset_.slice_idx = 0;
140  out_offset_.byte_idx = 0;
141  grpc_slice_buffer_swap(slices_to_send, &buf_);
142  Ref();
143  }
144 
145  // References: 1 reference per sendmsg(), and 1 for the tcp_write().
146  void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
147 
148  // Unref: called when we get an error queue notification for a sendmsg(), if a
149  // sendmsg() failed or when tcp_write() is done.
150  bool Unref() {
151  const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel);
152  GPR_DEBUG_ASSERT(prior > 0);
153  if (prior == 1) {
154  AllSendsComplete();
155  return true;
156  }
157  return false;
158  }
159 
160  private:
161  struct OutgoingOffset {
162  size_t slice_idx = 0;
163  size_t byte_idx = 0;
164  };
165 
166  void AssertEmpty() {
167  GPR_DEBUG_ASSERT(buf_.count == 0);
168  GPR_DEBUG_ASSERT(buf_.length == 0);
169  GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
170  }
171 
172  // When all sendmsg() calls associated with this tcp_write() have been
173  // completed (ie. we have received the notifications for each sequence number
174  // for each sendmsg()) and all reference counts have been dropped, drop our
175  // reference to the underlying data since we no longer need it.
176  void AllSendsComplete() {
177  GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
179  }
180 
182  std::atomic<intptr_t> ref_{0};
183  OutgoingOffset out_offset_;
184 };
185 
186 class TcpZerocopySendCtx {
187  public:
188  static constexpr int kDefaultMaxSends = 4;
189  static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB
190 
191  explicit TcpZerocopySendCtx(
192  int max_sends = kDefaultMaxSends,
193  size_t send_bytes_threshold = kDefaultSendBytesThreshold)
194  : max_sends_(max_sends),
195  free_send_records_size_(max_sends),
196  threshold_bytes_(send_bytes_threshold) {
197  send_records_ = static_cast<TcpZerocopySendRecord*>(
198  gpr_malloc(max_sends * sizeof(*send_records_)));
199  free_send_records_ = static_cast<TcpZerocopySendRecord**>(
200  gpr_malloc(max_sends * sizeof(*free_send_records_)));
201  if (send_records_ == nullptr || free_send_records_ == nullptr) {
202  gpr_free(send_records_);
203  gpr_free(free_send_records_);
204  gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n");
205  memory_limited_ = true;
206  } else {
207  for (int idx = 0; idx < max_sends_; ++idx) {
208  new (send_records_ + idx) TcpZerocopySendRecord();
209  free_send_records_[idx] = send_records_ + idx;
210  }
211  }
212  }
213 
214  ~TcpZerocopySendCtx() {
215  if (send_records_ != nullptr) {
216  for (int idx = 0; idx < max_sends_; ++idx) {
217  send_records_[idx].~TcpZerocopySendRecord();
218  }
219  }
220  gpr_free(send_records_);
221  gpr_free(free_send_records_);
222  }
223 
224  // True if we were unable to allocate the various bookkeeping structures at
225  // transport initialization time. If memory limited, we do not zerocopy.
226  bool memory_limited() const { return memory_limited_; }
227 
228  // TCP send zerocopy maintains an implicit sequence number for every
229  // successful sendmsg() with zerocopy enabled; the kernel later gives us an
230  // error queue notification with this sequence number indicating that the
231  // underlying data buffers that we sent can now be released. Once that
232  // notification is received, we can release the buffers associated with this
233  // zerocopy send record. Here, we associate the sequence number with the data
234  // buffers that were sent with the corresponding call to sendmsg().
235  void NoteSend(TcpZerocopySendRecord* record) {
236  record->Ref();
237  AssociateSeqWithSendRecord(last_send_, record);
238  ++last_send_;
239  }
240 
241  // If sendmsg() actually failed, though, we need to revert the sequence number
242  // that we speculatively bumped before calling sendmsg(). Note that we bump
243  // this sequence number and perform relevant bookkeeping (see: NoteSend())
244  // *before* calling sendmsg() since, if we called it *after* sendmsg(), then
245  // there is a possible race with the release notification which could occur on
246  // another thread before we do the necessary bookkeeping. Hence, calling
247  // NoteSend() *before* sendmsg() and implementing an undo function is needed.
248  void UndoSend() {
249  --last_send_;
250  if (ReleaseSendRecord(last_send_)->Unref()) {
251  // We should still be holding the ref taken by tcp_write().
252  GPR_DEBUG_ASSERT(0);
253  }
254  }
255 
256  // Simply associate this send record (and the underlying sent data buffers)
257  // with the implicit sequence number for this zerocopy sendmsg().
258  void AssociateSeqWithSendRecord(uint32_t seq, TcpZerocopySendRecord* record) {
259  MutexLock guard(&lock_);
260  ctx_lookup_.emplace(seq, record);
261  }
262 
263  // Get a send record for a send that we wish to do with zerocopy.
264  TcpZerocopySendRecord* GetSendRecord() {
265  MutexLock guard(&lock_);
266  return TryGetSendRecordLocked();
267  }
268 
269  // A given send record corresponds to a single tcp_write() with zerocopy
270  // enabled. This can result in several sendmsg() calls to flush all of the
271  // data to wire. Each sendmsg() takes a reference on the
272  // TcpZerocopySendRecord, and corresponds to a single sequence number.
273  // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
274  // single sequence number. This is called either when we receive the relevant
275  // error queue notification (saying that we can discard the underlying
276  // buffers for this sendmsg()) is received from the kernel - or, in case
277  // sendmsg() was unsuccessful to begin with.
278  TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) {
279  MutexLock guard(&lock_);
280  return ReleaseSendRecordLocked(seq);
281  }
282 
283  // After all the references to a TcpZerocopySendRecord are released, we can
284  // add it back to the pool (of size max_sends_). Note that we can only have
285  // max_sends_ tcp_write() instances with zerocopy enabled in flight at the
286  // same time.
287  void PutSendRecord(TcpZerocopySendRecord* record) {
288  GPR_DEBUG_ASSERT(record >= send_records_ &&
289  record < send_records_ + max_sends_);
290  MutexLock guard(&lock_);
291  PutSendRecordLocked(record);
292  }
293 
294  // Indicate that we are disposing of this zerocopy context. This indicator
295  // will prevent new zerocopy writes from being issued.
296  void Shutdown() { shutdown_.store(true, std::memory_order_release); }
297 
298  // Indicates that there are no inflight tcp_write() instances with zerocopy
299  // enabled.
300  bool AllSendRecordsEmpty() {
301  MutexLock guard(&lock_);
302  return free_send_records_size_ == max_sends_;
303  }
304 
305  bool enabled() const { return enabled_; }
306 
307  void set_enabled(bool enabled) {
308  GPR_DEBUG_ASSERT(!enabled || !memory_limited());
309  enabled_ = enabled;
310  }
311 
312  // Only use zerocopy if we are sending at least this many bytes. The
313  // additional overhead of reading the error queue for notifications means that
314  // zerocopy is not useful for small transfers.
315  size_t threshold_bytes() const { return threshold_bytes_; }
316 
317  private:
318  TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) {
319  auto iter = ctx_lookup_.find(seq);
320  GPR_DEBUG_ASSERT(iter != ctx_lookup_.end());
321  TcpZerocopySendRecord* record = iter->second;
322  ctx_lookup_.erase(iter);
323  return record;
324  }
325 
326  TcpZerocopySendRecord* TryGetSendRecordLocked() {
327  if (shutdown_.load(std::memory_order_acquire)) {
328  return nullptr;
329  }
330  if (free_send_records_size_ == 0) {
331  return nullptr;
332  }
333  free_send_records_size_--;
334  return free_send_records_[free_send_records_size_];
335  }
336 
337  void PutSendRecordLocked(TcpZerocopySendRecord* record) {
338  GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_);
339  free_send_records_[free_send_records_size_] = record;
340  free_send_records_size_++;
341  }
342 
343  TcpZerocopySendRecord* send_records_;
344  TcpZerocopySendRecord** free_send_records_;
345  int max_sends_;
346  int free_send_records_size_;
347  Mutex lock_;
348  uint32_t last_send_ = 0;
349  std::atomic<bool> shutdown_{false};
350  bool enabled_ = false;
351  size_t threshold_bytes_ = kDefaultSendBytesThreshold;
352  std::unordered_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_;
353  bool memory_limited_ = false;
354 };
355 
356 } // namespace grpc_core
357 
358 using grpc_core::TcpZerocopySendCtx;
359 using grpc_core::TcpZerocopySendRecord;
360 
361 namespace {
362 
363 bool ExperimentalTcpFrameSizeTuningEnabled() {
364  static const bool kEnableTcpFrameSizeTuning =
365  GPR_GLOBAL_CONFIG_GET(grpc_experimental_enable_tcp_frame_size_tuning);
366  return kEnableTcpFrameSizeTuning;
367 }
368 
369 struct grpc_tcp {
370  grpc_tcp(int max_sends, size_t send_bytes_threshold)
371  : tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
373  grpc_fd* em_fd;
374  int fd;
375  /* Used by the endpoint read function to distinguish the very first read call
376  * from the rest */
377  bool is_first_read;
378  bool has_posted_reclaimer;
379  double target_length;
380  double bytes_read_this_round;
382  gpr_atm shutdown_count;
383 
384  int min_read_chunk_size;
385  int max_read_chunk_size;
386 
387  /* garbage after the last read */
388  grpc_slice_buffer last_read_buffer;
389 
390  grpc_core::Mutex read_mu;
391  grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr;
392  int inq; /* bytes pending on the socket from the last read. */
393  bool inq_capable; /* cache whether kernel supports inq */
394 
395  grpc_slice_buffer* outgoing_buffer;
396  /* byte within outgoing_buffer->slices[0] to write next */
397  size_t outgoing_byte_idx;
398 
401  grpc_closure* release_fd_cb;
402  int* release_fd;
403 
404  grpc_closure read_done_closure;
405  grpc_closure write_done_closure;
406  grpc_closure error_closure;
407 
408  std::string peer_string;
409  std::string local_address;
410 
411  grpc_core::MemoryOwner memory_owner;
413 
414  grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
415  gpr_mu tb_mu; /* Lock for access to list of traced buffers */
416 
417  /* grpc_endpoint_write takes an argument which if non-null means that the
418  * transport layer wants the TCP layer to collect timestamps for this write.
419  * This arg is forwarded to the timestamps callback function when the ACK
420  * timestamp is received from the kernel. This arg is a (void *) which allows
421  * users of this API to pass in a pointer to any kind of structure. This
422  * structure could actually be a tag or any book-keeping object that the user
423  * can use to distinguish between different traced writes. The only
424  * requirement from the TCP endpoint layer is that this arg should be non-null
425  * if the user wants timestamps for the write. */
426  void* outgoing_buffer_arg;
427  /* A counter which starts at 0. It is initialized the first time the socket
428  * options for collecting timestamps are set, and is incremented with each
429  * byte sent. */
430  int bytes_counter;
431  bool socket_ts_enabled; /* True if timestamping options are set on the socket
432  */
433  bool ts_capable; /* Cache whether we can set timestamping options */
434  gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified
435  on errors anymore */
436  TcpZerocopySendCtx tcp_zerocopy_send_ctx;
437  TcpZerocopySendRecord* current_zerocopy_send = nullptr;
438 
439  bool frame_size_tuning_enabled;
440  int min_progress_size; /* A hint from upper layers specifying the minimum
441  number of bytes that need to be read to make
442  meaningful progress */
443 };
444 
445 struct backup_poller {
446  gpr_mu* pollset_mu;
448 };
449 
450 } // namespace
451 
452 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);
453 
454 #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
455 
456 static grpc_core::Mutex* g_backup_poller_mu = nullptr;
457 static int g_uncovered_notifications_pending
458  ABSL_GUARDED_BY(g_backup_poller_mu);
459 static backup_poller* g_backup_poller ABSL_GUARDED_BY(g_backup_poller_mu);
460 
461 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
462 static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
463 static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
465 
466 static void done_poller(void* bp, grpc_error_handle /*error_ignored*/) {
467  backup_poller* p = static_cast<backup_poller*>(bp);
469  gpr_log(GPR_INFO, "BACKUP_POLLER:%p destroy", p);
470  }
471  grpc_pollset_destroy(BACKUP_POLLER_POLLSET(p));
472  gpr_free(p);
473 }
474 
475 static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
476  backup_poller* p = static_cast<backup_poller*>(bp);
478  gpr_log(GPR_INFO, "BACKUP_POLLER:%p run", p);
479  }
480  gpr_mu_lock(p->pollset_mu);
481  grpc_core::Timestamp deadline =
485  "backup_poller:pollset_work",
486  grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline));
487  gpr_mu_unlock(p->pollset_mu);
488  g_backup_poller_mu->Lock();
489  /* last "uncovered" notification is the ref that keeps us polling */
490  if (g_uncovered_notifications_pending == 1) {
491  GPR_ASSERT(g_backup_poller == p);
492  g_backup_poller = nullptr;
493  g_uncovered_notifications_pending = 0;
494  g_backup_poller_mu->Unlock();
496  gpr_log(GPR_INFO, "BACKUP_POLLER:%p shutdown", p);
497  }
498  grpc_pollset_shutdown(BACKUP_POLLER_POLLSET(p),
499  GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
500  grpc_schedule_on_exec_ctx));
501  } else {
502  g_backup_poller_mu->Unlock();
504  gpr_log(GPR_INFO, "BACKUP_POLLER:%p reschedule", p);
505  }
509  }
510 }
511 
512 static void drop_uncovered(grpc_tcp* /*tcp*/) {
513  int old_count;
514  backup_poller* p;
515  g_backup_poller_mu->Lock();
516  p = g_backup_poller;
517  old_count = g_uncovered_notifications_pending--;
518  g_backup_poller_mu->Unlock();
519  GPR_ASSERT(old_count > 1);
521  gpr_log(GPR_INFO, "BACKUP_POLLER:%p uncover cnt %d->%d", p, old_count,
522  old_count - 1);
523  }
524 }
525 
526 // gRPC API considers a Write operation to be done the moment it clears ‘flow
527 // control’ i.e., not necessarily sent on the wire. This means that the
528 // application MIGHT not call `grpc_completion_queue_next/pluck` in a timely
529 // manner when its `Write()` API is acked.
530 //
531 // We need to ensure that the fd is 'covered' (i.e being monitored by some
532 // polling thread and progress is made) and hence add it to a backup poller here
533 static void cover_self(grpc_tcp* tcp) {
534  backup_poller* p;
535  g_backup_poller_mu->Lock();
536  int old_count = 0;
537  if (g_uncovered_notifications_pending == 0) {
538  g_uncovered_notifications_pending = 2;
539  p = static_cast<backup_poller*>(
540  gpr_zalloc(sizeof(*p) + grpc_pollset_size()));
541  g_backup_poller = p;
542  grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
543  g_backup_poller_mu->Unlock();
546  gpr_log(GPR_INFO, "BACKUP_POLLER:%p create", p);
547  }
549  GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr),
552  } else {
553  old_count = g_uncovered_notifications_pending++;
554  p = g_backup_poller;
555  g_backup_poller_mu->Unlock();
556  }
558  gpr_log(GPR_INFO, "BACKUP_POLLER:%p add %p cnt %d->%d", p, tcp,
559  old_count - 1, old_count);
560  }
561  grpc_pollset_add_fd(BACKUP_POLLER_POLLSET(p), tcp->em_fd);
562 }
563 
564 static void notify_on_read(grpc_tcp* tcp) {
566  gpr_log(GPR_INFO, "TCP:%p notify_on_read", tcp);
567  }
568  grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure);
569 }
570 
571 static void notify_on_write(grpc_tcp* tcp) {
573  gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp);
574  }
576  cover_self(tcp);
577  }
578  grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
579 }
580 
581 static void tcp_drop_uncovered_then_handle_write(void* arg,
584  gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg,
586  }
587  drop_uncovered(static_cast<grpc_tcp*>(arg));
588  tcp_handle_write(arg, error);
589 }
590 
591 static void add_to_estimate(grpc_tcp* tcp, size_t bytes) {
592  tcp->bytes_read_this_round += static_cast<double>(bytes);
593 }
594 
595 static void finish_estimate(grpc_tcp* tcp) {
596  /* If we read >80% of the target buffer in one read loop, increase the size
597  of the target buffer to either the amount read, or twice its previous
598  value */
599  if (tcp->bytes_read_this_round > tcp->target_length * 0.8) {
600  tcp->target_length =
601  std::max(2 * tcp->target_length, tcp->bytes_read_this_round);
602  } else {
603  tcp->target_length =
604  0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round;
605  }
606  tcp->bytes_read_this_round = 0;
607 }
608 
609 static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
610  grpc_tcp* tcp) {
611  return grpc_error_set_str(
613  grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
614  /* All tcp errors are marked with UNAVAILABLE so that application may
615  * choose to retry. */
617  GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string);
618 }
619 
620 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
621 static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
622 
623 static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
624  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
625  ZerocopyDisableAndWaitForRemaining(tcp);
626  grpc_fd_shutdown(tcp->em_fd, why);
627 }
628 
629 static void tcp_free(grpc_tcp* tcp) {
630  grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
631  "tcp_unref_orphan");
632  grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
633  /* The lock is not really necessary here, since all refs have been released */
634  gpr_mu_lock(&tcp->tb_mu);
636  &tcp->tb_head, tcp->outgoing_buffer_arg,
637  GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
638  gpr_mu_unlock(&tcp->tb_mu);
639  tcp->outgoing_buffer_arg = nullptr;
640  gpr_mu_destroy(&tcp->tb_mu);
641  delete tcp;
642 }
643 
644 #ifndef NDEBUG
645 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), DEBUG_LOCATION)
646 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), DEBUG_LOCATION)
647 static void tcp_unref(grpc_tcp* tcp, const char* reason,
648  const grpc_core::DebugLocation& debug_location) {
649  if (GPR_UNLIKELY(tcp->refcount.Unref(debug_location, reason))) {
650  tcp_free(tcp);
651  }
652 }
653 
654 static void tcp_ref(grpc_tcp* tcp, const char* reason,
655  const grpc_core::DebugLocation& debug_location) {
656  tcp->refcount.Ref(debug_location, reason);
657 }
658 #else
659 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
660 #define TCP_REF(tcp, reason) tcp_ref((tcp))
661 static void tcp_unref(grpc_tcp* tcp) {
662  if (GPR_UNLIKELY(tcp->refcount.Unref())) {
663  tcp_free(tcp);
664  }
665 }
666 
667 static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
668 #endif
669 
670 static void tcp_destroy(grpc_endpoint* ep) {
671  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
674  ZerocopyDisableAndWaitForRemaining(tcp);
675  gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
676  grpc_fd_set_error(tcp->em_fd);
677  }
678  TCP_UNREF(tcp, "destroy");
679 }
680 
681 static void perform_reclamation(grpc_tcp* tcp)
682  ABSL_LOCKS_EXCLUDED(tcp->read_mu) {
684  gpr_log(GPR_INFO, "TCP: benign reclamation to free memory");
685  }
686  tcp->read_mu.Lock();
687  if (tcp->incoming_buffer != nullptr) {
689  }
690  tcp->read_mu.Unlock();
691  tcp->has_posted_reclaimer = false;
692 }
693 
694 static void maybe_post_reclaimer(grpc_tcp* tcp)
696  if (!tcp->has_posted_reclaimer) {
697  tcp->has_posted_reclaimer = true;
698  tcp->memory_owner.PostReclaimer(
701  if (!sweep.has_value()) return;
702  perform_reclamation(tcp);
703  });
704  }
705 }
706 
707 static void tcp_trace_read(grpc_tcp* tcp, grpc_error_handle error)
709  grpc_closure* cb = tcp->read_cb;
711  gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
712  size_t i;
713  gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp,
714  tcp->peer_string.c_str(), grpc_error_std_string(error).c_str());
716  for (i = 0; i < tcp->incoming_buffer->count; i++) {
717  char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
719  gpr_log(GPR_DEBUG, "DATA: %s", dump);
720  gpr_free(dump);
721  }
722  }
723  }
724 }
725 
726 /* Returns true if data available to read or error other than EAGAIN. */
727 #define MAX_READ_IOVEC 4
728 static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
730  GPR_TIMER_SCOPE("tcp_do_read", 0);
732  gpr_log(GPR_INFO, "TCP:%p do_read", tcp);
733  }
734  struct msghdr msg;
735  struct iovec iov[MAX_READ_IOVEC];
737  size_t total_read_bytes = 0;
738  size_t iov_len =
739  std::min<size_t>(MAX_READ_IOVEC, tcp->incoming_buffer->count);
740 #ifdef GRPC_LINUX_ERRQUEUE
741  constexpr size_t cmsg_alloc_space =
742  CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + CMSG_SPACE(sizeof(int));
743 #else
744  constexpr size_t cmsg_alloc_space = 24 /* CMSG_SPACE(sizeof(int)) */;
745 #endif /* GRPC_LINUX_ERRQUEUE */
746  char cmsgbuf[cmsg_alloc_space];
747  for (size_t i = 0; i < iov_len; i++) {
748  iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
749  iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
750  }
751 
752  GPR_ASSERT(tcp->incoming_buffer->length != 0);
753  GPR_DEBUG_ASSERT(tcp->min_progress_size > 0);
754 
755  do {
756  /* Assume there is something on the queue. If we receive TCP_INQ from
757  * kernel, we will update this value, otherwise, we have to assume there is
758  * always something to read until we get EAGAIN. */
759  tcp->inq = 1;
760 
761  msg.msg_name = nullptr;
762  msg.msg_namelen = 0;
763  msg.msg_iov = iov;
764  msg.msg_iovlen = static_cast<msg_iovlen_type>(iov_len);
765  if (tcp->inq_capable) {
766  msg.msg_control = cmsgbuf;
767  msg.msg_controllen = sizeof(cmsgbuf);
768  } else {
769  msg.msg_control = nullptr;
770  msg.msg_controllen = 0;
771  }
772  msg.msg_flags = 0;
773 
774  GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length);
775  GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count);
776 
777  do {
778  GPR_TIMER_SCOPE("recvmsg", 0);
780  read_bytes = recvmsg(tcp->fd, &msg, 0);
781  } while (read_bytes < 0 && errno == EINTR);
782 
783  /* We have read something in previous reads. We need to deliver those
784  * bytes to the upper layer. */
785  if (read_bytes <= 0 &&
786  total_read_bytes >= static_cast<size_t>(tcp->min_progress_size)) {
787  tcp->inq = 1;
788  break;
789  }
790 
791  if (read_bytes < 0) {
792  /* NB: After calling call_read_cb a parallel call of the read handler may
793  * be running. */
794  if (errno == EAGAIN) {
795  if (total_read_bytes > 0) {
796  break;
797  }
798  finish_estimate(tcp);
799  tcp->inq = 0;
800  return false;
801  } else {
803  *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp);
804  return true;
805  }
806  }
807  if (read_bytes == 0) {
808  /* 0 read size ==> end of stream
809  *
810  * We may have read something, i.e., total_read_bytes > 0, but
811  * since the connection is closed we will drop the data here, because we
812  * can't call the callback multiple times. */
814  *error = tcp_annotate_error(
815  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp);
816  return true;
817  }
818 
820  add_to_estimate(tcp, static_cast<size_t>(read_bytes));
821  GPR_DEBUG_ASSERT((size_t)read_bytes <=
822  tcp->incoming_buffer->length - total_read_bytes);
823 
824 #ifdef GRPC_HAVE_TCP_INQ
825  if (tcp->inq_capable) {
826  GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC));
827  struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
828  for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
829  if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
830  cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
831  tcp->inq = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
832  break;
833  }
834  }
835  }
836 #endif /* GRPC_HAVE_TCP_INQ */
837 
838  total_read_bytes += read_bytes;
839  if (tcp->inq == 0 || total_read_bytes == tcp->incoming_buffer->length) {
840  break;
841  }
842 
843  /* We had a partial read, and still have space to read more data.
844  * So, adjust IOVs and try to read more. */
845  size_t remaining = read_bytes;
846  size_t j = 0;
847  for (size_t i = 0; i < iov_len; i++) {
848  if (remaining >= iov[i].iov_len) {
849  remaining -= iov[i].iov_len;
850  continue;
851  }
852  if (remaining > 0) {
853  iov[j].iov_base = static_cast<char*>(iov[i].iov_base) + remaining;
854  iov[j].iov_len = iov[i].iov_len - remaining;
855  remaining = 0;
856  } else {
857  iov[j].iov_base = iov[i].iov_base;
858  iov[j].iov_len = iov[i].iov_len;
859  }
860  ++j;
861  }
862  iov_len = j;
863  } while (true);
864 
865  if (tcp->inq == 0) {
866  finish_estimate(tcp);
867  }
868 
869  GPR_DEBUG_ASSERT(total_read_bytes > 0);
871  if (tcp->frame_size_tuning_enabled) {
872  // Update min progress size based on the total number of bytes read in
873  // this round.
874  tcp->min_progress_size -= total_read_bytes;
875  if (tcp->min_progress_size > 0) {
876  // There is still some bytes left to be read before we can signal
877  // the read as complete. Append the bytes read so far into
878  // last_read_buffer which serves as a staging buffer. Return false
879  // to indicate tcp_handle_read needs to be scheduled again.
880  grpc_slice_buffer_move_first(tcp->incoming_buffer, total_read_bytes,
881  &tcp->last_read_buffer);
882  return false;
883  } else {
884  // The required number of bytes have been read. Append the bytes
885  // read in this round into last_read_buffer. Then swap last_read_buffer
886  // and incoming_buffer. Now incoming buffer contains all the bytes
887  // read since the start of the last tcp_read operation. last_read_buffer
888  // would contain any spare space left in the incoming buffer. This
889  // space will be used in the next tcp_read operation.
890  tcp->min_progress_size = 1;
891  grpc_slice_buffer_move_first(tcp->incoming_buffer, total_read_bytes,
892  &tcp->last_read_buffer);
893  grpc_slice_buffer_swap(&tcp->last_read_buffer, tcp->incoming_buffer);
894  return true;
895  }
896  }
897  if (total_read_bytes < tcp->incoming_buffer->length) {
898  grpc_slice_buffer_trim_end(tcp->incoming_buffer,
899  tcp->incoming_buffer->length - total_read_bytes,
900  &tcp->last_read_buffer);
901  }
902  return true;
903 }
904 
905 static void maybe_make_read_slices(grpc_tcp* tcp)
907  if (tcp->incoming_buffer->length <
908  static_cast<size_t>(tcp->min_progress_size) &&
909  tcp->incoming_buffer->count < MAX_READ_IOVEC) {
912  "TCP:%p alloc_slices; min_chunk=%d max_chunk=%d target=%lf "
913  "buf_len=%" PRIdPTR,
914  tcp, tcp->min_read_chunk_size, tcp->max_read_chunk_size,
915  tcp->target_length, tcp->incoming_buffer->length);
916  }
917  int target_length =
918  std::max(static_cast<int>(tcp->target_length), tcp->min_progress_size);
919  int extra_wanted =
920  target_length - static_cast<int>(tcp->incoming_buffer->length);
921  int min_read_chunk_size =
922  std::max(tcp->min_read_chunk_size, tcp->min_progress_size);
923  int max_read_chunk_size =
924  std::max(tcp->max_read_chunk_size, tcp->min_progress_size);
926  tcp->incoming_buffer,
927  tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest(
928  min_read_chunk_size,
929  grpc_core::Clamp(extra_wanted, min_read_chunk_size,
930  max_read_chunk_size))));
932  }
933 }
934 
935 static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
936  grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
938  gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp,
940  }
941  tcp->read_mu.Lock();
942  grpc_error_handle tcp_read_error;
944  maybe_make_read_slices(tcp);
945  if (!tcp_do_read(tcp, &tcp_read_error)) {
946  /* We've consumed the edge, request a new one */
947  tcp->read_mu.Unlock();
948  notify_on_read(tcp);
949  return;
950  }
951  tcp_trace_read(tcp, tcp_read_error);
952  } else {
953  tcp_read_error = GRPC_ERROR_REF(error);
956  }
957  grpc_closure* cb = tcp->read_cb;
958  tcp->read_cb = nullptr;
959  tcp->incoming_buffer = nullptr;
960  tcp->read_mu.Unlock();
961  grpc_core::Closure::Run(DEBUG_LOCATION, cb, tcp_read_error);
962  TCP_UNREF(tcp, "read");
963 }
964 
965 static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
966  grpc_closure* cb, bool urgent, int min_progress_size) {
967  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
968  GPR_ASSERT(tcp->read_cb == nullptr);
969  tcp->read_cb = cb;
970  tcp->read_mu.Lock();
971  tcp->incoming_buffer = incoming_buffer;
972  tcp->min_progress_size =
973  tcp->frame_size_tuning_enabled ? min_progress_size : 1;
975  grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
976  tcp->read_mu.Unlock();
977  TCP_REF(tcp, "read");
978  if (tcp->is_first_read) {
979  /* Endpoint read called for the very first time. Register read callback with
980  * the polling engine */
981  tcp->is_first_read = false;
982  notify_on_read(tcp);
983  } else if (!urgent && tcp->inq == 0) {
984  /* Upper layer asked to read more but we know there is no pending data
985  * to read from previous reads. So, wait for POLLIN.
986  */
987  notify_on_read(tcp);
988  } else {
989  /* Not the first time. We may or may not have more bytes available. In any
990  * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the
991  * right thing (i.e calls tcp_do_read() which either reads the available
992  * bytes or calls notify_on_read() to be notified when new bytes become
993  * available */
994  grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure,
996  }
997 }
998 
999 /* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
1000  * of bytes sent. */
1001 ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
1002  int additional_flags = 0) {
1003  GPR_TIMER_SCOPE("sendmsg", 1);
1004  ssize_t sent_length;
1005  do {
1006  /* TODO(klempner): Cork if this is a partial write */
1008  sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
1009  } while (sent_length < 0 && (*saved_errno = errno) == EINTR);
1010  return sent_length;
1011 }
1012 
1019 static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
1020  size_t sending_length,
1021  ssize_t* sent_length, int* saved_errno,
1022  int additional_flags = 0);
1023 
1025 static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error_handle error);
1026 
1027 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1028  grpc_tcp* tcp, grpc_slice_buffer* buf);
1029 
1030 #ifdef GRPC_LINUX_ERRQUEUE
1031 static bool process_errors(grpc_tcp* tcp);
1032 
1033 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1034  grpc_tcp* tcp, grpc_slice_buffer* buf) {
1035  TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1036  const bool use_zerocopy =
1037  tcp->tcp_zerocopy_send_ctx.enabled() &&
1038  tcp->tcp_zerocopy_send_ctx.threshold_bytes() < buf->length;
1039  if (use_zerocopy) {
1040  zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord();
1041  if (zerocopy_send_record == nullptr) {
1042  process_errors(tcp);
1043  zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord();
1044  }
1045  if (zerocopy_send_record != nullptr) {
1046  zerocopy_send_record->PrepareForSends(buf);
1047  GPR_DEBUG_ASSERT(buf->count == 0);
1048  GPR_DEBUG_ASSERT(buf->length == 0);
1049  tcp->outgoing_byte_idx = 0;
1050  tcp->outgoing_buffer = nullptr;
1051  }
1052  }
1053  return zerocopy_send_record;
1054 }
1055 
1056 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {
1057  tcp->tcp_zerocopy_send_ctx.Shutdown();
1058  while (!tcp->tcp_zerocopy_send_ctx.AllSendRecordsEmpty()) {
1059  process_errors(tcp);
1060  }
1061 }
1062 
1063 static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
1064  size_t sending_length,
1065  ssize_t* sent_length, int* saved_errno,
1066  int additional_flags) {
1067  if (!tcp->socket_ts_enabled) {
1068  uint32_t opt = grpc_core::kTimestampingSocketOptions;
1069  if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
1070  static_cast<void*>(&opt), sizeof(opt)) != 0) {
1072  gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
1073  }
1074  return false;
1075  }
1076  tcp->bytes_counter = -1;
1077  tcp->socket_ts_enabled = true;
1078  }
1079  /* Set control message to indicate that you want timestamps. */
1080  union {
1081  char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
1082  struct cmsghdr align;
1083  } u;
1084  cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
1085  cmsg->cmsg_level = SOL_SOCKET;
1086  cmsg->cmsg_type = SO_TIMESTAMPING;
1087  cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
1088  *reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
1089  grpc_core::kTimestampingRecordingOptions;
1090  msg->msg_control = u.cmsg_buf;
1091  msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
1092 
1093  /* If there was an error on sendmsg the logic in tcp_flush will handle it. */
1094  ssize_t length = tcp_send(tcp->fd, msg, saved_errno, additional_flags);
1095  *sent_length = length;
1096  /* Only save timestamps if all the bytes were taken by sendmsg. */
1097  if (sending_length == static_cast<size_t>(length)) {
1098  gpr_mu_lock(&tcp->tb_mu);
1099  grpc_core::TracedBuffer::AddNewEntry(
1100  &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
1101  tcp->fd, tcp->outgoing_buffer_arg);
1102  gpr_mu_unlock(&tcp->tb_mu);
1103  tcp->outgoing_buffer_arg = nullptr;
1104  }
1105  return true;
1106 }
1107 
1108 static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
1109  TcpZerocopySendRecord* record,
1110  uint32_t seq, const char* tag);
1111 // Reads \a cmsg to process zerocopy control messages.
1112 static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) {
1113  GPR_DEBUG_ASSERT(cmsg);
1114  auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(cmsg));
1115  GPR_DEBUG_ASSERT(serr->ee_errno == 0);
1116  GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
1117  const uint32_t lo = serr->ee_info;
1118  const uint32_t hi = serr->ee_data;
1119  for (uint32_t seq = lo; seq <= hi; ++seq) {
1120  // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
1121  // numbers that are generated by a single call to grpc_endpoint_write; ie.
1122  // we can batch the unref operation. So, check if record is the same for
1123  // both; if so, batch the unref/put.
1124  TcpZerocopySendRecord* record =
1125  tcp->tcp_zerocopy_send_ctx.ReleaseSendRecord(seq);
1126  GPR_DEBUG_ASSERT(record);
1127  UnrefMaybePutZerocopySendRecord(tcp, record, seq, "CALLBACK RCVD");
1128  }
1129 }
1130 
1131 // Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
1132 static bool CmsgIsIpLevel(const cmsghdr& cmsg) {
1133  return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
1134  (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
1135 }
1136 
1137 static bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
1138  if (!CmsgIsIpLevel(cmsg)) {
1139  return false;
1140  }
1141  auto serr = reinterpret_cast<const sock_extended_err*> CMSG_DATA(&cmsg);
1142  return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
1143 }
1144 
1152 struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
1153  struct cmsghdr* cmsg) {
1154  auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
1155  cmsghdr* opt_stats = nullptr;
1156  if (next_cmsg == nullptr) {
1158  gpr_log(GPR_ERROR, "Received timestamp without extended error");
1159  }
1160  return cmsg;
1161  }
1162 
1163  /* Check if next_cmsg is an OPT_STATS msg */
1164  if (next_cmsg->cmsg_level == SOL_SOCKET &&
1165  next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
1166  opt_stats = next_cmsg;
1167  next_cmsg = CMSG_NXTHDR(msg, opt_stats);
1168  if (next_cmsg == nullptr) {
1170  gpr_log(GPR_ERROR, "Received timestamp without extended error");
1171  }
1172  return opt_stats;
1173  }
1174  }
1175 
1176  if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
1177  !(next_cmsg->cmsg_type == IP_RECVERR ||
1178  next_cmsg->cmsg_type == IPV6_RECVERR)) {
1180  gpr_log(GPR_ERROR, "Unexpected control message");
1181  }
1182  return cmsg;
1183  }
1184 
1185  auto tss =
1186  reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
1187  auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
1188  if (serr->ee_errno != ENOMSG ||
1189  serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
1190  gpr_log(GPR_ERROR, "Unexpected control message");
1191  return cmsg;
1192  }
1193  /* The error handling can potentially be done on another thread so we need
1194  * to protect the traced buffer list. A lock free list might be better. Using
1195  * a simple mutex for now. */
1196  gpr_mu_lock(&tcp->tb_mu);
1197  grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, opt_stats,
1198  tss);
1199  gpr_mu_unlock(&tcp->tb_mu);
1200  return next_cmsg;
1201 }
1202 
1206 static bool process_errors(grpc_tcp* tcp) {
1207  bool processed_err = false;
1208  struct iovec iov;
1209  iov.iov_base = nullptr;
1210  iov.iov_len = 0;
1211  struct msghdr msg;
1212  msg.msg_name = nullptr;
1213  msg.msg_namelen = 0;
1214  msg.msg_iov = &iov;
1215  msg.msg_iovlen = 0;
1216  msg.msg_flags = 0;
1217  /* Allocate enough space so we don't need to keep increasing this as size
1218  * of OPT_STATS increase */
1219  constexpr size_t cmsg_alloc_space =
1220  CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) +
1221  CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
1222  CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)));
1223  /* Allocate aligned space for cmsgs received along with timestamps */
1224  union {
1225  char rbuf[cmsg_alloc_space];
1226  struct cmsghdr align;
1227  } aligned_buf;
1228  msg.msg_control = aligned_buf.rbuf;
1229  int r, saved_errno;
1230  while (true) {
1231  msg.msg_controllen = sizeof(aligned_buf.rbuf);
1232  do {
1233  r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
1234  saved_errno = errno;
1235  } while (r < 0 && saved_errno == EINTR);
1236 
1237  if (r == -1 && saved_errno == EAGAIN) {
1238  return processed_err; /* No more errors to process */
1239  }
1240  if (r == -1) {
1241  return processed_err;
1242  }
1243  if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) {
1244  gpr_log(GPR_ERROR, "Error message was truncated.");
1245  }
1246 
1247  if (msg.msg_controllen == 0) {
1248  /* There was no control message found. It was probably spurious. */
1249  return processed_err;
1250  }
1251  bool seen = false;
1252  for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
1253  cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1254  if (CmsgIsZeroCopy(*cmsg)) {
1255  process_zerocopy(tcp, cmsg);
1256  seen = true;
1257  processed_err = true;
1258  } else if (cmsg->cmsg_level == SOL_SOCKET &&
1259  cmsg->cmsg_type == SCM_TIMESTAMPING) {
1260  cmsg = process_timestamp(tcp, &msg, cmsg);
1261  seen = true;
1262  processed_err = true;
1263  } else {
1264  /* Got a control message that is not a timestamp or zerocopy. Don't know
1265  * how to handle this. */
1267  gpr_log(GPR_INFO,
1268  "unknown control message cmsg_level:%d cmsg_type:%d",
1269  cmsg->cmsg_level, cmsg->cmsg_type);
1270  }
1271  return processed_err;
1272  }
1273  }
1274  if (!seen) {
1275  return processed_err;
1276  }
1277  }
1278 }
1279 
1280 static void tcp_handle_error(void* arg /* grpc_tcp */,
1282  grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
1284  gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp,
1286  }
1287 
1288  if (!GRPC_ERROR_IS_NONE(error) ||
1289  static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
1290  /* We aren't going to register to hear on error anymore, so it is safe to
1291  * unref. */
1292  TCP_UNREF(tcp, "error-tracking");
1293  return;
1294  }
1295 
1296  /* We are still interested in collecting timestamps, so let's try reading
1297  * them. */
1298  bool processed = process_errors(tcp);
1299  /* This might not a timestamps error. Set the read and write closures to be
1300  * ready. */
1301  if (!processed) {
1302  grpc_fd_set_readable(tcp->em_fd);
1303  grpc_fd_set_writable(tcp->em_fd);
1304  }
1305  grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
1306 }
1307 
1308 #else /* GRPC_LINUX_ERRQUEUE */
1309 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1310  grpc_tcp* /*tcp*/, grpc_slice_buffer* /*buf*/) {
1311  return nullptr;
1312 }
1313 
1314 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {}
1315 
1316 static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
1317  size_t /*sending_length*/,
1318  ssize_t* /*sent_length*/,
1319  int* /* saved_errno */,
1320  int /*additional_flags*/) {
1321  gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
1322  GPR_ASSERT(0);
1323  return false;
1324 }
1325 
1326 static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */,
1327  grpc_error_handle /*error*/) {
1328  gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
1329  GPR_ASSERT(0);
1330 }
1331 #endif /* GRPC_LINUX_ERRQUEUE */
1332 
1333 /* If outgoing_buffer_arg is filled, shuts down the list early, so that any
1334  * release operations needed can be performed on the arg */
1335 void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
1336  if (tcp->outgoing_buffer_arg) {
1337  gpr_mu_lock(&tcp->tb_mu);
1339  &tcp->tb_head, tcp->outgoing_buffer_arg,
1340  GRPC_ERROR_CREATE_FROM_STATIC_STRING("TracedBuffer list shutdown"));
1341  gpr_mu_unlock(&tcp->tb_mu);
1342  tcp->outgoing_buffer_arg = nullptr;
1343  }
1344 }
1345 
1346 #if defined(IOV_MAX) && IOV_MAX < 260
1347 #define MAX_WRITE_IOVEC IOV_MAX
1348 #else
1349 #define MAX_WRITE_IOVEC 260
1350 #endif
1351 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
1352  size_t* unwind_byte_idx,
1353  size_t* sending_length,
1354  iovec* iov) {
1355  msg_iovlen_type iov_size;
1356  *unwind_slice_idx = out_offset_.slice_idx;
1357  *unwind_byte_idx = out_offset_.byte_idx;
1358  for (iov_size = 0;
1359  out_offset_.slice_idx != buf_.count && iov_size != MAX_WRITE_IOVEC;
1360  iov_size++) {
1361  iov[iov_size].iov_base =
1362  GRPC_SLICE_START_PTR(buf_.slices[out_offset_.slice_idx]) +
1363  out_offset_.byte_idx;
1364  iov[iov_size].iov_len =
1365  GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]) -
1366  out_offset_.byte_idx;
1367  *sending_length += iov[iov_size].iov_len;
1368  ++(out_offset_.slice_idx);
1369  out_offset_.byte_idx = 0;
1370  }
1371  GPR_DEBUG_ASSERT(iov_size > 0);
1372  return iov_size;
1373 }
1374 
1375 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
1376  size_t actually_sent) {
1377  size_t trailing = sending_length - actually_sent;
1378  while (trailing > 0) {
1379  size_t slice_length;
1380  out_offset_.slice_idx--;
1381  slice_length = GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]);
1382  if (slice_length > trailing) {
1383  out_offset_.byte_idx = slice_length - trailing;
1384  break;
1385  } else {
1386  trailing -= slice_length;
1387  }
1388  }
1389 }
1390 
1391 // returns true if done, false if pending; if returning true, *error is set
1392 static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
1394  msg_iovlen_type iov_size;
1395  ssize_t sent_length = 0;
1396  size_t sending_length;
1397  size_t unwind_slice_idx;
1398  size_t unwind_byte_idx;
1399  bool tried_sending_message;
1400  int saved_errno;
1401  msghdr msg;
1402  // iov consumes a large space. Keep it as the last item on the stack to
1403  // improve locality. After all, we expect only the first elements of it being
1404  // populated in most cases.
1405  iovec iov[MAX_WRITE_IOVEC];
1406  while (true) {
1407  sending_length = 0;
1408  iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
1409  &sending_length, iov);
1410  msg.msg_name = nullptr;
1411  msg.msg_namelen = 0;
1412  msg.msg_iov = iov;
1413  msg.msg_iovlen = iov_size;
1414  msg.msg_flags = 0;
1415  tried_sending_message = false;
1416  // Before calling sendmsg (with or without timestamps): we
1417  // take a single ref on the zerocopy send record.
1418  tcp->tcp_zerocopy_send_ctx.NoteSend(record);
1419  saved_errno = 0;
1420  if (tcp->outgoing_buffer_arg != nullptr) {
1421  if (!tcp->ts_capable ||
1422  !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
1423  &saved_errno, MSG_ZEROCOPY)) {
1424  /* We could not set socket options to collect Fathom timestamps.
1425  * Fallback on writing without timestamps. */
1426  tcp->ts_capable = false;
1427  tcp_shutdown_buffer_list(tcp);
1428  } else {
1429  tried_sending_message = true;
1430  }
1431  }
1432  if (!tried_sending_message) {
1433  msg.msg_control = nullptr;
1434  msg.msg_controllen = 0;
1435  GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
1437  sent_length = tcp_send(tcp->fd, &msg, &saved_errno, MSG_ZEROCOPY);
1438  }
1439  if (sent_length < 0) {
1440  // If this particular send failed, drop ref taken earlier in this method.
1441  tcp->tcp_zerocopy_send_ctx.UndoSend();
1442  if (saved_errno == EAGAIN) {
1443  record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
1444  return false;
1445  } else if (saved_errno == EPIPE) {
1446  *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
1447  tcp_shutdown_buffer_list(tcp);
1448  return true;
1449  } else {
1450  *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
1451  tcp_shutdown_buffer_list(tcp);
1452  return true;
1453  }
1454  }
1455  tcp->bytes_counter += sent_length;
1456  record->UpdateOffsetForBytesSent(sending_length,
1457  static_cast<size_t>(sent_length));
1458  if (record->AllSlicesSent()) {
1460  return true;
1461  }
1462  }
1463 }
1464 
1465 static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
1466  TcpZerocopySendRecord* record,
1467  uint32_t /*seq*/,
1468  const char* /*tag*/) {
1469  if (record->Unref()) {
1470  tcp->tcp_zerocopy_send_ctx.PutSendRecord(record);
1471  }
1472 }
1473 
1474 static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
1476  bool done = do_tcp_flush_zerocopy(tcp, record, error);
1477  if (done) {
1478  // Either we encountered an error, or we successfully sent all the bytes.
1479  // In either case, we're done with this record.
1480  UnrefMaybePutZerocopySendRecord(tcp, record, 0, "flush_done");
1481  }
1482  return done;
1483 }
1484 
1485 static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
1486  struct msghdr msg;
1487  struct iovec iov[MAX_WRITE_IOVEC];
1488  msg_iovlen_type iov_size;
1489  ssize_t sent_length = 0;
1490  size_t sending_length;
1491  size_t trailing;
1492  size_t unwind_slice_idx;
1493  size_t unwind_byte_idx;
1494  int saved_errno;
1495 
1496  // We always start at zero, because we eagerly unref and trim the slice
1497  // buffer as we write
1498  size_t outgoing_slice_idx = 0;
1499 
1500  while (true) {
1501  sending_length = 0;
1502  unwind_slice_idx = outgoing_slice_idx;
1503  unwind_byte_idx = tcp->outgoing_byte_idx;
1504  for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count &&
1505  iov_size != MAX_WRITE_IOVEC;
1506  iov_size++) {
1507  iov[iov_size].iov_base =
1509  tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
1510  tcp->outgoing_byte_idx;
1511  iov[iov_size].iov_len =
1512  GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) -
1513  tcp->outgoing_byte_idx;
1514  sending_length += iov[iov_size].iov_len;
1515  outgoing_slice_idx++;
1516  tcp->outgoing_byte_idx = 0;
1517  }
1518  GPR_ASSERT(iov_size > 0);
1519 
1520  msg.msg_name = nullptr;
1521  msg.msg_namelen = 0;
1522  msg.msg_iov = iov;
1523  msg.msg_iovlen = iov_size;
1524  msg.msg_flags = 0;
1525  bool tried_sending_message = false;
1526  saved_errno = 0;
1527  if (tcp->outgoing_buffer_arg != nullptr) {
1528  if (!tcp->ts_capable ||
1529  !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
1530  &saved_errno)) {
1531  /* We could not set socket options to collect Fathom timestamps.
1532  * Fallback on writing without timestamps. */
1533  tcp->ts_capable = false;
1534  tcp_shutdown_buffer_list(tcp);
1535  } else {
1536  tried_sending_message = true;
1537  }
1538  }
1539  if (!tried_sending_message) {
1540  msg.msg_control = nullptr;
1541  msg.msg_controllen = 0;
1542 
1543  GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
1545 
1546  sent_length = tcp_send(tcp->fd, &msg, &saved_errno);
1547  }
1548 
1549  if (sent_length < 0) {
1550  if (saved_errno == EAGAIN) {
1551  tcp->outgoing_byte_idx = unwind_byte_idx;
1552  // unref all and forget about all slices that have been written to this
1553  // point
1554  for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
1555  grpc_slice_buffer_remove_first(tcp->outgoing_buffer);
1556  }
1557  return false;
1558  } else if (saved_errno == EPIPE) {
1559  *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
1561  tcp_shutdown_buffer_list(tcp);
1562  return true;
1563  } else {
1564  *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
1566  tcp_shutdown_buffer_list(tcp);
1567  return true;
1568  }
1569  }
1570 
1571  GPR_ASSERT(tcp->outgoing_byte_idx == 0);
1572  tcp->bytes_counter += sent_length;
1573  trailing = sending_length - static_cast<size_t>(sent_length);
1574  while (trailing > 0) {
1575  size_t slice_length;
1576 
1577  outgoing_slice_idx--;
1578  slice_length =
1579  GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]);
1580  if (slice_length > trailing) {
1581  tcp->outgoing_byte_idx = slice_length - trailing;
1582  break;
1583  } else {
1584  trailing -= slice_length;
1585  }
1586  }
1587  if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
1590  return true;
1591  }
1592  }
1593 }
1594 
1595 static void tcp_handle_write(void* arg /* grpc_tcp */,
1597  grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
1598  grpc_closure* cb;
1599 
1600  if (!GRPC_ERROR_IS_NONE(error)) {
1601  cb = tcp->write_cb;
1602  tcp->write_cb = nullptr;
1603  if (tcp->current_zerocopy_send != nullptr) {
1604  UnrefMaybePutZerocopySendRecord(tcp, tcp->current_zerocopy_send, 0,
1605  "handle_write_err");
1606  tcp->current_zerocopy_send = nullptr;
1607  }
1609  TCP_UNREF(tcp, "write");
1610  return;
1611  }
1612 
1613  bool flush_result =
1614  tcp->current_zerocopy_send != nullptr
1615  ? tcp_flush_zerocopy(tcp, tcp->current_zerocopy_send, &error)
1616  : tcp_flush(tcp, &error);
1617  if (!flush_result) {
1619  gpr_log(GPR_INFO, "write: delayed");
1620  }
1621  notify_on_write(tcp);
1622  // tcp_flush does not populate error if it has returned false.
1624  } else {
1625  cb = tcp->write_cb;
1626  tcp->write_cb = nullptr;
1627  tcp->current_zerocopy_send = nullptr;
1629  gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str());
1630  }
1631  // No need to take a ref on error since tcp_flush provides a ref.
1633  TCP_UNREF(tcp, "write");
1634  }
1635 }
1636 
1637 static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
1638  grpc_closure* cb, void* arg, int /*max_frame_size*/) {
1639  GPR_TIMER_SCOPE("tcp_write", 0);
1640  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1642  TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1643 
1645  size_t i;
1646 
1647  for (i = 0; i < buf->count; i++) {
1648  gpr_log(GPR_INFO, "WRITE %p (peer=%s)", tcp, tcp->peer_string.c_str());
1650  char* data =
1652  gpr_log(GPR_DEBUG, "DATA: %s", data);
1653  gpr_free(data);
1654  }
1655  }
1656  }
1657 
1658  GPR_ASSERT(tcp->write_cb == nullptr);
1659  GPR_DEBUG_ASSERT(tcp->current_zerocopy_send == nullptr);
1660 
1661  if (buf->length == 0) {
1663  DEBUG_LOCATION, cb,
1664  grpc_fd_is_shutdown(tcp->em_fd)
1665  ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
1666  tcp)
1667  : GRPC_ERROR_NONE);
1668  tcp_shutdown_buffer_list(tcp);
1669  return;
1670  }
1671 
1672  zerocopy_send_record = tcp_get_send_zerocopy_record(tcp, buf);
1673  if (zerocopy_send_record == nullptr) {
1674  // Either not enough bytes, or couldn't allocate a zerocopy context.
1675  tcp->outgoing_buffer = buf;
1676  tcp->outgoing_byte_idx = 0;
1677  }
1678  tcp->outgoing_buffer_arg = arg;
1679  if (arg) {
1681  }
1682 
1683  bool flush_result =
1684  zerocopy_send_record != nullptr
1685  ? tcp_flush_zerocopy(tcp, zerocopy_send_record, &error)
1686  : tcp_flush(tcp, &error);
1687  if (!flush_result) {
1688  TCP_REF(tcp, "write");
1689  tcp->write_cb = cb;
1690  tcp->current_zerocopy_send = zerocopy_send_record;
1692  gpr_log(GPR_INFO, "write: delayed");
1693  }
1694  notify_on_write(tcp);
1695  } else {
1697  gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str());
1698  }
1700  }
1701 }
1702 
1703 static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
1704  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1705  grpc_pollset_add_fd(pollset, tcp->em_fd);
1706 }
1707 
1708 static void tcp_add_to_pollset_set(grpc_endpoint* ep,
1709  grpc_pollset_set* pollset_set) {
1710  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1711  grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
1712 }
1713 
1714 static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
1715  grpc_pollset_set* pollset_set) {
1716  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1717  grpc_pollset_set_del_fd(pollset_set, tcp->em_fd);
1718 }
1719 
1720 static absl::string_view tcp_get_peer(grpc_endpoint* ep) {
1721  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1722  return tcp->peer_string;
1723 }
1724 
1725 static absl::string_view tcp_get_local_address(grpc_endpoint* ep) {
1726  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1727  return tcp->local_address;
1728 }
1729 
1730 static int tcp_get_fd(grpc_endpoint* ep) {
1731  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1732  return tcp->fd;
1733 }
1734 
1735 static bool tcp_can_track_err(grpc_endpoint* ep) {
1736  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1738  return false;
1739  }
1740  struct sockaddr addr;
1741  socklen_t len = sizeof(addr);
1742  if (getsockname(tcp->fd, &addr, &len) < 0) {
1743  return false;
1744  }
1745  return addr.sa_family == AF_INET || addr.sa_family == AF_INET6;
1746 }
1747 
1748 static const grpc_endpoint_vtable vtable = {tcp_read,
1749  tcp_write,
1750  tcp_add_to_pollset,
1751  tcp_add_to_pollset_set,
1752  tcp_delete_from_pollset_set,
1753  tcp_shutdown,
1754  tcp_destroy,
1755  tcp_get_peer,
1756  tcp_get_local_address,
1757  tcp_get_fd,
1758  tcp_can_track_err};
1759 
1760 #define MAX_CHUNK_SIZE (32 * 1024 * 1024)
1761 
1763  const grpc_channel_args* channel_args,
1764  absl::string_view peer_string) {
1765  static constexpr bool kZerocpTxEnabledDefault = false;
1766  int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
1767  int tcp_max_read_chunk_size = 4 * 1024 * 1024;
1768  int tcp_min_read_chunk_size = 256;
1769  bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault;
1770  int tcp_tx_zerocopy_send_bytes_thresh =
1771  grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold;
1772  int tcp_tx_zerocopy_max_simult_sends =
1773  grpc_core::TcpZerocopySendCtx::kDefaultMaxSends;
1774  if (channel_args != nullptr) {
1775  for (size_t i = 0; i < channel_args->num_args; i++) {
1776  if (0 ==
1777  strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
1778  grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1779  tcp_read_chunk_size =
1780  grpc_channel_arg_get_integer(&channel_args->args[i], options);
1781  } else if (0 == strcmp(channel_args->args[i].key,
1783  grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1784  tcp_min_read_chunk_size =
1785  grpc_channel_arg_get_integer(&channel_args->args[i], options);
1786  } else if (0 == strcmp(channel_args->args[i].key,
1788  grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
1789  tcp_max_read_chunk_size =
1790  grpc_channel_arg_get_integer(&channel_args->args[i], options);
1791  } else if (0 == strcmp(channel_args->args[i].key,
1793  tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool(
1794  &channel_args->args[i], kZerocpTxEnabledDefault);
1795  } else if (0 == strcmp(channel_args->args[i].key,
1798  grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0,
1799  INT_MAX};
1800  tcp_tx_zerocopy_send_bytes_thresh =
1801  grpc_channel_arg_get_integer(&channel_args->args[i], options);
1802  } else if (0 == strcmp(channel_args->args[i].key,
1805  grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX};
1806  tcp_tx_zerocopy_max_simult_sends =
1807  grpc_channel_arg_get_integer(&channel_args->args[i], options);
1808  }
1809  }
1810  }
1811 
1812  if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
1813  tcp_min_read_chunk_size = tcp_max_read_chunk_size;
1814  }
1815  tcp_read_chunk_size = grpc_core::Clamp(
1816  tcp_read_chunk_size, tcp_min_read_chunk_size, tcp_max_read_chunk_size);
1817 
1818  grpc_tcp* tcp = new grpc_tcp(tcp_tx_zerocopy_max_simult_sends,
1819  tcp_tx_zerocopy_send_bytes_thresh);
1820  tcp->base.vtable = &vtable;
1821  tcp->peer_string = std::string(peer_string);
1822  tcp->fd = grpc_fd_wrapped_fd(em_fd);
1823  tcp->memory_owner = grpc_core::ResourceQuotaFromChannelArgs(channel_args)
1824  ->memory_quota()
1825  ->CreateMemoryOwner(peer_string);
1826  tcp->self_reservation = tcp->memory_owner.MakeReservation(sizeof(grpc_tcp));
1827  grpc_resolved_address resolved_local_addr;
1828  memset(&resolved_local_addr, 0, sizeof(resolved_local_addr));
1829  resolved_local_addr.len = sizeof(resolved_local_addr.addr);
1830  absl::StatusOr<std::string> addr_uri;
1831  if (getsockname(tcp->fd,
1832  reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
1833  &resolved_local_addr.len) < 0 ||
1834  !(addr_uri = grpc_sockaddr_to_uri(&resolved_local_addr)).ok()) {
1835  tcp->local_address = "";
1836  } else {
1837  tcp->local_address = addr_uri.value();
1838  }
1839  tcp->read_cb = nullptr;
1840  tcp->write_cb = nullptr;
1841  tcp->current_zerocopy_send = nullptr;
1842  tcp->release_fd_cb = nullptr;
1843  tcp->release_fd = nullptr;
1844  tcp->target_length = static_cast<double>(tcp_read_chunk_size);
1845  tcp->min_read_chunk_size = tcp_min_read_chunk_size;
1846  tcp->max_read_chunk_size = tcp_max_read_chunk_size;
1847  tcp->bytes_read_this_round = 0;
1848  /* Will be set to false by the very first endpoint read function */
1849  tcp->is_first_read = true;
1850  tcp->has_posted_reclaimer = false;
1851  tcp->bytes_counter = -1;
1852  tcp->socket_ts_enabled = false;
1853  tcp->ts_capable = true;
1854  tcp->outgoing_buffer_arg = nullptr;
1855  tcp->frame_size_tuning_enabled = ExperimentalTcpFrameSizeTuningEnabled();
1856  tcp->min_progress_size = 1;
1857  if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
1858 #ifdef GRPC_LINUX_ERRQUEUE
1859  const int enable = 1;
1860  auto err =
1861  setsockopt(tcp->fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
1862  if (err == 0) {
1863  tcp->tcp_zerocopy_send_ctx.set_enabled(true);
1864  } else {
1865  gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket.");
1866  }
1867 #endif
1868  }
1869  /* paired with unref in grpc_tcp_destroy */
1870  new (&tcp->refcount) grpc_core::RefCount(
1871  1, GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) ? "tcp" : nullptr);
1872  gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
1873  tcp->em_fd = em_fd;
1874  grpc_slice_buffer_init(&tcp->last_read_buffer);
1875  gpr_mu_init(&tcp->tb_mu);
1876  tcp->tb_head = nullptr;
1877  GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
1878  grpc_schedule_on_exec_ctx);
1880  // If there is a polling engine always running in the background, there is
1881  // no need to run the backup poller.
1882  GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp,
1883  grpc_schedule_on_exec_ctx);
1884  } else {
1885  GRPC_CLOSURE_INIT(&tcp->write_done_closure,
1886  tcp_drop_uncovered_then_handle_write, tcp,
1887  grpc_schedule_on_exec_ctx);
1888  }
1889  /* Always assume there is something on the queue to read. */
1890  tcp->inq = 1;
1891 #ifdef GRPC_HAVE_TCP_INQ
1892  int one = 1;
1893  if (setsockopt(tcp->fd, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {
1894  tcp->inq_capable = true;
1895  } else {
1896  gpr_log(GPR_DEBUG, "cannot set inq fd=%d errno=%d", tcp->fd, errno);
1897  tcp->inq_capable = false;
1898  }
1899 #else
1900  tcp->inq_capable = false;
1901 #endif /* GRPC_HAVE_TCP_INQ */
1902  /* Start being notified on errors if event engine can track errors. */
1904  /* Grab a ref to tcp so that we can safely access the tcp struct when
1905  * processing errors. We unref when we no longer want to track errors
1906  * separately. */
1907  TCP_REF(tcp, "error-tracking");
1908  gpr_atm_rel_store(&tcp->stop_error_notification, 0);
1909  GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
1910  grpc_schedule_on_exec_ctx);
1911  grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
1912  }
1913 
1914  return &tcp->base;
1915 }
1916 
1917 int grpc_tcp_fd(grpc_endpoint* ep) {
1918  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1919  GPR_ASSERT(ep->vtable == &vtable);
1920  return grpc_fd_wrapped_fd(tcp->em_fd);
1921 }
1922 
1924  grpc_closure* done) {
1925  grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
1926  GPR_ASSERT(ep->vtable == &vtable);
1927  tcp->release_fd = fd;
1928  tcp->release_fd_cb = done;
1929  grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
1931  /* Stop errors notification. */
1932  ZerocopyDisableAndWaitForRemaining(tcp);
1933  gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
1934  grpc_fd_set_error(tcp->em_fd);
1935  }
1936  TCP_UNREF(tcp, "destroy");
1937 }
1938 
1939 void grpc_tcp_posix_init() { g_backup_poller_mu = new grpc_core::Mutex; }
1940 
1941 void grpc_tcp_posix_shutdown() {
1942  delete g_backup_poller_mu;
1943  g_backup_poller_mu = nullptr;
1944 }
1945 
1946 #endif /* GRPC_POSIX_SOCKET_TCP */
grpc_tcp_fd
int grpc_tcp_fd(grpc_endpoint *ep)
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
trace.h
maybe_post_reclaimer
static void maybe_post_reclaimer(secure_endpoint *ep)
Definition: secure_endpoint.cc:196
grpc_core::MemoryOwner
Definition: memory_quota.h:381
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_pollset_size
size_t grpc_pollset_size(void)
Definition: pollset.cc:56
GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE
#define GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE
Definition: grpc_types.h:337
grpc_endpoint_vtable
Definition: endpoint.h:39
grpc_pollset_set_del_fd
void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd)
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
vtable
static const grpc_transport_vtable vtable
Definition: binder_transport.cc:680
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
gpr_atm_no_barrier_store
#define gpr_atm_no_barrier_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:56
absl::str_format_internal::LengthMod::j
@ j
grpc_event_engine_can_track_errors
bool grpc_event_engine_can_track_errors()
sockaddr_utils.h
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
MutexLock
#define MutexLock(x)
Definition: bloaty/third_party/re2/util/mutex.h:125
AF_INET6
#define AF_INET6
Definition: ares_setup.h:208
read_cb
static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Definition: benchmark-pound.c:138
grpc_core::DebugLocation
Definition: debug_location.h:31
GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD
#define GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD
Definition: grpc_types.h:345
memset
return memset(p, 0, total)
GPR_DEBUG_ASSERT
#define GPR_DEBUG_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:103
grpc_tcp_destroy_and_release_fd
void grpc_tcp_destroy_and_release_fd(grpc_endpoint *ep, int *fd, grpc_closure *done)
grpc_dump_slice
char * grpc_dump_slice(const grpc_slice &s, uint32_t flags)
Definition: slice_string_helpers.cc:25
timers.h
gpr_should_log
GPRAPI void GPRAPI int gpr_should_log(gpr_log_severity severity)
Definition: log.cc:67
buffer_list.h
grpc_channel_arg_get_integer
int grpc_channel_arg_get_integer(const grpc_arg *arg, const grpc_integer_options options)
Definition: channel_args.cc:405
slice.h
GPR_GLOBAL_CONFIG_GET
#define GPR_GLOBAL_CONFIG_GET(name)
Definition: global_config_generic.h:24
GPR_TIMER_SCOPE
#define GPR_TIMER_SCOPE(tag, important)
Definition: src/core/lib/profiling/timers.h:43
grpc_core
Definition: call_metric_recorder.h:31
GRPC_STATS_INC_SYSCALL_WRITE
#define GRPC_STATS_INC_SYSCALL_WRITE()
Definition: stats_data.h:206
grpc_fd_shutdown
void grpc_fd_shutdown(grpc_fd *fd, grpc_error_handle why)
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
GPR_LIKELY
#define GPR_LIKELY(x)
Definition: impl/codegen/port_platform.h:769
string.h
options
double_dict options[]
Definition: capstone_test.c:55
buf
voidpf void * buf
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
grpc_integer_options
Definition: channel_args.h:310
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
grpc_endpoint::vtable
const grpc_endpoint_vtable * vtable
Definition: endpoint.h:106
GRPC_STATS_INC_SYSCALL_READ
#define GRPC_STATS_INC_SYSCALL_READ()
Definition: stats_data.h:208
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
error_ref_leak.err
err
Definition: error_ref_leak.py:35
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE
#define GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(value)
Definition: stats_data.h:402
grpc_resolved_address
Definition: resolved_address.h:34
tcp
static uv_tcp_t tcp
Definition: test-connection-fail.c:29
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
u
OPENSSL_EXPORT pem_password_cb void * u
Definition: pem.h:351
ABSL_GUARDED_BY
#define ABSL_GUARDED_BY(x)
Definition: abseil-cpp/absl/base/thread_annotations.h:62
grpc_resource_quota_trace
grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota")
grpc_pollset_work
grpc_error_handle grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_core::Timestamp deadline)
Definition: pollset.cc:45
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED
#define GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED()
Definition: stats_data.h:210
GRPC_LOG_IF_ERROR
#define GRPC_LOG_IF_ERROR(what, error)
Definition: error.h:398
time.h
GRPC_ERROR_INT_FD
@ GRPC_ERROR_INT_FD
File descriptor associated with this error.
Definition: error.h:86
xds_manager.p
p
Definition: xds_manager.py:60
GPR_DUMP_HEX
#define GPR_DUMP_HEX
Definition: string.h:34
GPR_LOG_SEVERITY_DEBUG
@ GPR_LOG_SEVERITY_DEBUG
Definition: include/grpc/impl/codegen/log.h:46
run_poller
static void run_poller(void *arg, grpc_error_handle error)
Definition: backup_poller.cc:120
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
GRPC_STATS_INC_TCP_READ_OFFER
#define GRPC_STATS_INC_TCP_READ_OFFER(value)
Definition: stats_data.h:408
GPR_DUMP_ASCII
#define GPR_DUMP_ASCII
Definition: string.h:35
grpc_core::Executor::Run
static void Run(grpc_closure *closure, grpc_error_handle error, ExecutorType executor_type=ExecutorType::DEFAULT, ExecutorJobType job_type=ExecutorJobType::SHORT)
Definition: executor.cc:398
stats.h
gen_build_yaml.struct
def struct(**kwargs)
Definition: test/core/end2end/gen_build_yaml.py:30
grpc_pollset_init
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu)
Definition: pollset.cc:33
ref_
grpc_stream_refcount ref_
Definition: binder_transport_test.cc:106
grpc_error_set_str
grpc_error_handle grpc_error_set_str(grpc_error_handle src, grpc_error_strs which, absl::string_view str)
Definition: error.cc:650
gpr_zalloc
GPRAPI void * gpr_zalloc(size_t size)
Definition: alloc.cc:40
GRPC_ERROR_STR_TARGET_ADDRESS
@ GRPC_ERROR_STR_TARGET_ADDRESS
peer that we were trying to communicate when this error occurred
Definition: error.h:117
grpc_types.h
iov
static uv_buf_t iov
Definition: libuv/docs/code/uvcat/main.c:15
grpc_channel_arg_get_bool
bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value)
Definition: channel_args.cc:447
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
grpc_tcp_trace
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp")
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
string_util.h
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
GRPC_OS_ERROR
#define GRPC_OS_ERROR(err, call_name)
create an error associated with errno!=0 (an 'operating system' error)
Definition: error.h:352
ssize_t
intptr_t ssize_t
Definition: win.h:27
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
grpc_core::Mutex::Lock
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:69
absl::optional::has_value
constexpr bool has_value() const noexcept
Definition: abseil-cpp/absl/types/optional.h:461
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
ev_posix.h
tag
static void * tag(intptr_t t)
Definition: bad_client.cc:318
max
int max
Definition: bloaty/third_party/zlib/examples/enough.c:170
grpc_fd
struct grpc_fd grpc_fd
Definition: ev_posix.h:44
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
grpc_fd_set_readable
void grpc_fd_set_readable(grpc_fd *fd)
ABSL_EXCLUSIVE_LOCKS_REQUIRED
#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:145
GPR_UNLIKELY
#define GPR_UNLIKELY(x)
Definition: impl/codegen/port_platform.h:770
done_poller
static void done_poller(void *arg, grpc_error_handle)
Definition: backup_poller.cc:97
tcp_posix.h
gpr_atm_acq_load
#define gpr_atm_acq_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:52
grpc_tcp_create
grpc_endpoint * grpc_tcp_create(grpc_fd *fd, const grpc_channel_args *args, absl::string_view peer_string)
uint64_t
unsigned __int64 uint64_t
Definition: stdint-msvc2008.h:90
GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE
#define GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE
Definition: grpc_types.h:335
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_core::ResourceQuota::memory_quota
MemoryQuotaRefPtr memory_quota()
Definition: src/core/lib/resource_quota/resource_quota.h:51
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc_fd_notify_on_write
void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure)
GRPC_SLICE_START_PTR
#define GRPC_SLICE_START_PTR(slice)
Definition: include/grpc/impl/codegen/slice.h:101
grpc_fd_set_error
void grpc_fd_set_error(grpc_fd *fd)
grpc_channel_args::num_args
size_t num_args
Definition: grpc_types.h:133
arg
Definition: cmdline.cc:40
buf_
char buf_[N]
Definition: cxa_demangle.cpp:4722
gpr_atm_rel_store
#define gpr_atm_rel_store(p, value)
Definition: impl/codegen/atm_gcc_atomic.h:54
read_bytes
static int read_bytes(int fd, char *buf, size_t read_size, int spin)
Definition: low_level_ping_pong.cc:71
grpc_core::ExecutorJobType::LONG
@ LONG
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
grpc_resolved_address::len
socklen_t len
Definition: resolved_address.h:36
grpc_slice_buffer_swap
GPRAPI void grpc_slice_buffer_swap(grpc_slice_buffer *a, grpc_slice_buffer *b)
Definition: slice/slice_buffer.cc:249
gen_synthetic_protos.base
base
Definition: gen_synthetic_protos.py:31
data
char data[kBufferLength]
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:1006
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
slice_internal.h
trace.h
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
grpc_event_engine::experimental::MemoryAllocator::Reservation
An automatic releasing reservation of memory.
Definition: memory_allocator.h:77
GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE
#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(value)
Definition: stats_data.h:411
grpc_slice_buffer_init
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:116
setup.idx
idx
Definition: third_party/bloaty/third_party/capstone/bindings/python/setup.py:197
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
executor.h
grpc_core::ExecutorType::DEFAULT
@ DEFAULT
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
GRPC_STATS_INC_TCP_READ_SIZE
#define GRPC_STATS_INC_TCP_READ_SIZE(value)
Definition: stats_data.h:405
GRPC_SLICE_LENGTH
#define GRPC_SLICE_LENGTH(slice)
Definition: include/grpc/impl/codegen/slice.h:104
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_fd_notify_on_read
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure)
grpc_event_engine_run_in_background
bool grpc_event_engine_run_in_background()
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
grpc_core::TracedBuffer::Shutdown
static void Shutdown(TracedBuffer **, void *, grpc_error_handle shutdown_err)
Definition: buffer_list.h:149
grpc_fd_is_shutdown
bool grpc_fd_is_shutdown(grpc_fd *fd)
grpc_core::Mutex
Definition: src/core/lib/gprpp/sync.h:61
grpc_core::Mutex::Unlock
void Unlock() ABSL_UNLOCK_FUNCTION()
Definition: src/core/lib/gprpp/sync.h:70
iovec
Definition: gsec.h:33
shutdown_
bool shutdown_
Definition: pick_first.cc:173
grpc_core::Clamp
T Clamp(T val, T min, T max)
Definition: useful.h:31
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_core::ReclamationPass::kBenign
@ kBenign
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
absl::time_internal::cctz::detail::align
CONSTEXPR_F fields align(second_tag, fields f) noexcept
Definition: abseil-cpp/absl/time/internal/cctz/include/cctz/civil_time_detail.h:325
bytes
uint8 bytes[10]
Definition: bloaty/third_party/protobuf/src/google/protobuf/io/coded_stream_unittest.cc:153
check_naked_includes.trailing
trailing
Definition: check_naked_includes.py:58
grpc_core::ResourceQuotaFromChannelArgs
ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args *args)
Definition: api.cc:40
iovec::iov_len
size_t iov_len
Definition: gsec.h:35
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED
#define GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED
Definition: grpc_types.h:341
GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS
#define GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS()
Definition: stats_data.h:212
ABSL_LOCKS_EXCLUDED
#define ABSL_LOCKS_EXCLUDED(...)
Definition: abseil-cpp/absl/base/thread_annotations.h:163
port.h
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
GPR_GLOBAL_CONFIG_DECLARE_BOOL
#define GPR_GLOBAL_CONFIG_DECLARE_BOOL(name)
Definition: global_config_generic.h:28
grpc_pollset_shutdown
void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure)
Definition: pollset.cc:37
alloc.h
google::protobuf.internal::Mutex
WrappedMutex Mutex
Definition: bloaty/third_party/protobuf/src/google/protobuf/stubs/mutex.h:113
fix_build_deps.r
r
Definition: fix_build_deps.py:491
GRPC_STATS_INC_TCP_WRITE_SIZE
#define GRPC_STATS_INC_TCP_WRITE_SIZE(value)
Definition: stats_data.h:399
grpc_arg::key
char * key
Definition: grpc_types.h:105
grpc_core::Duration::Seconds
static constexpr Duration Seconds(int64_t seconds)
Definition: src/core/lib/gprpp/time.h:151
socket_utils_posix.h
grpc_pollset_add_fd
void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd)
grpc_pollset_set_add_fd
void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd)
grpc_core::TracedBuffer
Definition: buffer_list.h:146
ok
bool ok
Definition: async_end2end_test.cc:197
arg
struct arg arg
grpc_slice_buffer_move_first
GPRAPI void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst)
Definition: slice/slice_buffer.cc:348
memory_quota.h
channel_args.h
GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS
#define GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS
Definition: grpc_types.h:351
grpc_slice_buffer_destroy_internal
void grpc_slice_buffer_destroy_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:123
grpc_fd_orphan
void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason)
grpc_slice_buffer_trim_end
GPRAPI void grpc_slice_buffer_trim_end(grpc_slice_buffer *sb, size_t n, grpc_slice_buffer *garbage)
Definition: slice/slice_buffer.cc:402
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
iter
Definition: test_winkernel.cpp:47
grpc_slice_buffer
Definition: include/grpc/impl/codegen/slice.h:83
absl::StatusOr::value
const T & value() const &ABSL_ATTRIBUTE_LIFETIME_BOUND
Definition: abseil-cpp/absl/status/statusor.h:687
absl::StatusOr< std::string >
grpc_core::ExecCtx::Now
Timestamp Now()
Definition: exec_ctx.cc:90
len
int len
Definition: abseil-cpp/absl/base/internal/low_level_alloc_test.cc:46
grpc_error
Definition: error_internal.h:42
GRPC_ARG_TCP_READ_CHUNK_SIZE
#define GRPC_ARG_TCP_READ_CHUNK_SIZE
Definition: grpc_types.h:330
length
std::size_t length
Definition: abseil-cpp/absl/time/internal/test_util.cc:57
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
seen
bool * seen
Definition: async_end2end_test.cc:198
write_cb
static void write_cb(uv_write_t *req, int status)
Definition: benchmark-pump.c:190
grpc_slice_buffer_remove_first
void grpc_slice_buffer_remove_first(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:449
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_fd_notify_on_error
void grpc_fd_notify_on_error(grpc_fd *fd, grpc_closure *closure)
grpc_pollset_destroy
void grpc_pollset_destroy(grpc_pollset *pollset)
Definition: pollset.cc:41
grpc_core::RefCount
Definition: ref_counted.h:44
grpc_channel_args::args
grpc_arg * args
Definition: grpc_types.h:134
sync.h
grpc_closure
Definition: closure.h:56
grpc_sockaddr_to_uri
absl::StatusOr< std::string > grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:260
grpc_resolved_address::addr
char addr[GRPC_MAX_SOCKADDR_SIZE]
Definition: resolved_address.h:35
grpc_endpoint
Definition: endpoint.h:105
GRPC_TCP_DEFAULT_READ_SLICE_SIZE
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE
Definition: grpc_types.h:334
grpc_core::Closure::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: closure.h:250
grpc_slice_buffer_reset_and_unref_internal
void grpc_slice_buffer_reset_and_unref_internal(grpc_slice_buffer *sb)
Definition: slice/slice_buffer.cc:238
grpc_event_engine::experimental::MemoryRequest
Reservation request - how much memory do we want to allocate?
Definition: memory_request.h:27
addr
struct sockaddr_in addr
Definition: libuv/docs/code/tcp-echo-server/main.c:10
grpc_fd_set_writable
void grpc_fd_set_writable(grpc_fd *fd)
sync.h
errno.h
cb
OPENSSL_EXPORT pem_password_cb * cb
Definition: pem.h:351
grpc_core::ExecCtx::Get
static ExecCtx * Get()
Definition: exec_ctx.h:205
slice_string_helpers.h
i
uint64_t i
Definition: abseil-cpp/absl/container/btree_benchmark.cc:230
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
grpc_slice_buffer_add_indexed
GPRAPI size_t grpc_slice_buffer_add_indexed(grpc_slice_buffer *sb, grpc_slice slice)
Definition: slice/slice_buffer.cc:161
grpc_fd_wrapped_fd
int grpc_fd_wrapped_fd(grpc_fd *fd)
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
api.h
port_platform.h


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:01:29