25 #ifdef GRPC_POSIX_SOCKET_TCP
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
35 #include <sys/socket.h>
36 #include <sys/types.h>
40 #include <unordered_map>
69 #define SOL_TCP IPPROTO_TCP
74 #define TCP_CM_INQ TCP_INQ
77 #ifdef GRPC_HAVE_MSG_NOSIGNAL
78 #define SENDMSG_FLAGS MSG_NOSIGNAL
80 #define SENDMSG_FLAGS 0
89 #define MSG_ZEROCOPY 0x4000000
92 #ifdef GRPC_MSG_IOVLEN_TYPE
93 typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
95 typedef size_t msg_iovlen_type;
104 class TcpZerocopySendRecord {
108 ~TcpZerocopySendRecord() {
116 msg_iovlen_type PopulateIovs(
size_t* unwind_slice_idx,
117 size_t* unwind_byte_idx,
size_t* sending_length,
123 void UnwindIfThrottled(
size_t unwind_slice_idx,
size_t unwind_byte_idx) {
124 out_offset_.byte_idx = unwind_byte_idx;
125 out_offset_.slice_idx = unwind_slice_idx;
131 void UpdateOffsetForBytesSent(
size_t sending_length,
size_t actually_sent);
134 bool AllSlicesSent() {
return out_offset_.slice_idx ==
buf_.count; }
139 out_offset_.slice_idx = 0;
140 out_offset_.byte_idx = 0;
146 void Ref() {
ref_.fetch_add(1, std::memory_order_relaxed); }
151 const intptr_t prior =
ref_.fetch_sub(1, std::memory_order_acq_rel);
161 struct OutgoingOffset {
162 size_t slice_idx = 0;
176 void AllSendsComplete() {
182 std::atomic<intptr_t>
ref_{0};
183 OutgoingOffset out_offset_;
186 class TcpZerocopySendCtx {
188 static constexpr
int kDefaultMaxSends = 4;
189 static constexpr
size_t kDefaultSendBytesThreshold = 16 * 1024;
191 explicit TcpZerocopySendCtx(
192 int max_sends = kDefaultMaxSends,
193 size_t send_bytes_threshold = kDefaultSendBytesThreshold)
194 : max_sends_(max_sends),
195 free_send_records_size_(max_sends),
196 threshold_bytes_(send_bytes_threshold) {
197 send_records_ =
static_cast<TcpZerocopySendRecord*
>(
198 gpr_malloc(max_sends *
sizeof(*send_records_)));
199 free_send_records_ =
static_cast<TcpZerocopySendRecord**
>(
200 gpr_malloc(max_sends *
sizeof(*free_send_records_)));
201 if (send_records_ ==
nullptr || free_send_records_ ==
nullptr) {
204 gpr_log(
GPR_INFO,
"Disabling TCP TX zerocopy due to memory pressure.\n");
205 memory_limited_ =
true;
207 for (
int idx = 0;
idx < max_sends_; ++
idx) {
208 new (send_records_ +
idx) TcpZerocopySendRecord();
209 free_send_records_[
idx] = send_records_ +
idx;
214 ~TcpZerocopySendCtx() {
215 if (send_records_ !=
nullptr) {
216 for (
int idx = 0;
idx < max_sends_; ++
idx) {
217 send_records_[
idx].~TcpZerocopySendRecord();
226 bool memory_limited()
const {
return memory_limited_; }
235 void NoteSend(TcpZerocopySendRecord* record) {
237 AssociateSeqWithSendRecord(last_send_, record);
250 if (ReleaseSendRecord(last_send_)->Unref()) {
258 void AssociateSeqWithSendRecord(
uint32_t seq, TcpZerocopySendRecord* record) {
260 ctx_lookup_.emplace(seq, record);
264 TcpZerocopySendRecord* GetSendRecord() {
266 return TryGetSendRecordLocked();
278 TcpZerocopySendRecord* ReleaseSendRecord(
uint32_t seq) {
280 return ReleaseSendRecordLocked(seq);
287 void PutSendRecord(TcpZerocopySendRecord* record) {
289 record < send_records_ + max_sends_);
291 PutSendRecordLocked(record);
300 bool AllSendRecordsEmpty() {
302 return free_send_records_size_ == max_sends_;
305 bool enabled()
const {
return enabled_; }
307 void set_enabled(
bool enabled) {
315 size_t threshold_bytes()
const {
return threshold_bytes_; }
318 TcpZerocopySendRecord* ReleaseSendRecordLocked(
uint32_t seq) {
319 auto iter = ctx_lookup_.find(seq);
321 TcpZerocopySendRecord* record =
iter->second;
322 ctx_lookup_.erase(
iter);
326 TcpZerocopySendRecord* TryGetSendRecordLocked() {
327 if (
shutdown_.load(std::memory_order_acquire)) {
330 if (free_send_records_size_ == 0) {
333 free_send_records_size_--;
334 return free_send_records_[free_send_records_size_];
337 void PutSendRecordLocked(TcpZerocopySendRecord* record) {
339 free_send_records_[free_send_records_size_] = record;
340 free_send_records_size_++;
343 TcpZerocopySendRecord* send_records_;
344 TcpZerocopySendRecord** free_send_records_;
346 int free_send_records_size_;
350 bool enabled_ =
false;
351 size_t threshold_bytes_ = kDefaultSendBytesThreshold;
352 std::unordered_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_;
353 bool memory_limited_ =
false;
358 using grpc_core::TcpZerocopySendCtx;
359 using grpc_core::TcpZerocopySendRecord;
363 bool ExperimentalTcpFrameSizeTuningEnabled() {
364 static const bool kEnableTcpFrameSizeTuning =
366 return kEnableTcpFrameSizeTuning;
370 grpc_tcp(
int max_sends,
size_t send_bytes_threshold)
371 : tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
378 bool has_posted_reclaimer;
379 double target_length;
380 double bytes_read_this_round;
384 int min_read_chunk_size;
385 int max_read_chunk_size;
397 size_t outgoing_byte_idx;
426 void* outgoing_buffer_arg;
431 bool socket_ts_enabled;
434 gpr_atm stop_error_notification;
436 TcpZerocopySendCtx tcp_zerocopy_send_ctx;
437 TcpZerocopySendRecord* current_zerocopy_send =
nullptr;
439 bool frame_size_tuning_enabled;
440 int min_progress_size;
445 struct backup_poller {
452 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp*
tcp);
454 #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
457 static int g_uncovered_notifications_pending
459 static backup_poller* g_backup_poller
ABSL_GUARDED_BY(g_backup_poller_mu);
463 static void tcp_drop_uncovered_then_handle_write(
void*
arg ,
467 backup_poller*
p =
static_cast<backup_poller*
>(bp);
476 backup_poller*
p =
static_cast<backup_poller*
>(bp);
485 "backup_poller:pollset_work",
488 g_backup_poller_mu->
Lock();
490 if (g_uncovered_notifications_pending == 1) {
492 g_backup_poller =
nullptr;
493 g_uncovered_notifications_pending = 0;
494 g_backup_poller_mu->
Unlock();
500 grpc_schedule_on_exec_ctx));
502 g_backup_poller_mu->
Unlock();
512 static void drop_uncovered(grpc_tcp* ) {
515 g_backup_poller_mu->
Lock();
517 old_count = g_uncovered_notifications_pending--;
518 g_backup_poller_mu->
Unlock();
533 static void cover_self(grpc_tcp*
tcp) {
535 g_backup_poller_mu->
Lock();
537 if (g_uncovered_notifications_pending == 0) {
538 g_uncovered_notifications_pending = 2;
539 p =
static_cast<backup_poller*
>(
543 g_backup_poller_mu->
Unlock();
553 old_count = g_uncovered_notifications_pending++;
555 g_backup_poller_mu->
Unlock();
559 old_count - 1, old_count);
564 static void notify_on_read(grpc_tcp*
tcp) {
571 static void notify_on_write(grpc_tcp*
tcp) {
581 static void tcp_drop_uncovered_then_handle_write(
void*
arg,
587 drop_uncovered(
static_cast<grpc_tcp*
>(
arg));
591 static void add_to_estimate(grpc_tcp*
tcp,
size_t bytes) {
592 tcp->bytes_read_this_round +=
static_cast<double>(
bytes);
595 static void finish_estimate(grpc_tcp*
tcp) {
599 if (
tcp->bytes_read_this_round >
tcp->target_length * 0.8) {
604 0.99 *
tcp->target_length + 0.01 *
tcp->bytes_read_this_round;
606 tcp->bytes_read_this_round = 0;
624 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
625 ZerocopyDisableAndWaitForRemaining(
tcp);
629 static void tcp_free(grpc_tcp*
tcp) {
636 &
tcp->tb_head,
tcp->outgoing_buffer_arg,
639 tcp->outgoing_buffer_arg =
nullptr;
645 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), DEBUG_LOCATION)
646 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), DEBUG_LOCATION)
647 static void tcp_unref(grpc_tcp*
tcp,
const char* reason,
654 static void tcp_ref(grpc_tcp*
tcp,
const char* reason,
656 tcp->refcount.Ref(debug_location, reason);
659 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
660 #define TCP_REF(tcp, reason) tcp_ref((tcp))
661 static void tcp_unref(grpc_tcp*
tcp) {
667 static void tcp_ref(grpc_tcp*
tcp) {
tcp->refcount.Ref(); }
671 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
674 ZerocopyDisableAndWaitForRemaining(
tcp);
678 TCP_UNREF(
tcp,
"destroy");
681 static void perform_reclamation(grpc_tcp*
tcp)
687 if (
tcp->incoming_buffer !=
nullptr) {
690 tcp->read_mu.Unlock();
691 tcp->has_posted_reclaimer =
false;
696 if (!
tcp->has_posted_reclaimer) {
697 tcp->has_posted_reclaimer =
true;
698 tcp->memory_owner.PostReclaimer(
702 perform_reclamation(
tcp);
716 for (
i = 0;
i <
tcp->incoming_buffer->count;
i++) {
727 #define MAX_READ_IOVEC 4
737 size_t total_read_bytes = 0;
739 std::min<size_t>(MAX_READ_IOVEC,
tcp->incoming_buffer->count);
740 #ifdef GRPC_LINUX_ERRQUEUE
741 constexpr
size_t cmsg_alloc_space =
742 CMSG_SPACE(
sizeof(grpc_core::scm_timestamping)) + CMSG_SPACE(
sizeof(
int));
744 constexpr
size_t cmsg_alloc_space = 24 ;
746 char cmsgbuf[cmsg_alloc_space];
761 msg.msg_name =
nullptr;
764 msg.msg_iovlen =
static_cast<msg_iovlen_type
>(
iov_len);
765 if (
tcp->inq_capable) {
766 msg.msg_control = cmsgbuf;
767 msg.msg_controllen =
sizeof(cmsgbuf);
769 msg.msg_control =
nullptr;
770 msg.msg_controllen = 0;
786 total_read_bytes >=
static_cast<size_t>(
tcp->min_progress_size)) {
794 if (errno == EAGAIN) {
795 if (total_read_bytes > 0) {
798 finish_estimate(
tcp);
814 *
error = tcp_annotate_error(
822 tcp->incoming_buffer->length - total_read_bytes);
824 #ifdef GRPC_HAVE_TCP_INQ
825 if (
tcp->inq_capable) {
827 struct cmsghdr* cmsg = CMSG_FIRSTHDR(&
msg);
828 for (; cmsg !=
nullptr; cmsg = CMSG_NXTHDR(&
msg, cmsg)) {
829 if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
830 cmsg->cmsg_len == CMSG_LEN(
sizeof(
int))) {
831 tcp->inq = *
reinterpret_cast<int*
>(CMSG_DATA(cmsg));
839 if (
tcp->inq == 0 || total_read_bytes ==
tcp->incoming_buffer->length) {
847 for (
size_t i = 0;
i < iov_len;
i++) {
848 if (remaining >=
iov[
i].iov_len) {
849 remaining -=
iov[
i].iov_len;
853 iov[
j].iov_base =
static_cast<char*
>(
iov[
i].iov_base) + remaining;
854 iov[
j].iov_len =
iov[
i].iov_len - remaining;
866 finish_estimate(
tcp);
871 if (
tcp->frame_size_tuning_enabled) {
874 tcp->min_progress_size -= total_read_bytes;
875 if (
tcp->min_progress_size > 0) {
881 &
tcp->last_read_buffer);
890 tcp->min_progress_size = 1;
892 &
tcp->last_read_buffer);
897 if (total_read_bytes < tcp->incoming_buffer->length) {
899 tcp->incoming_buffer->length - total_read_bytes,
900 &
tcp->last_read_buffer);
905 static void maybe_make_read_slices(grpc_tcp*
tcp)
907 if (
tcp->incoming_buffer->length <
908 static_cast<size_t>(
tcp->min_progress_size) &&
909 tcp->incoming_buffer->count < MAX_READ_IOVEC) {
912 "TCP:%p alloc_slices; min_chunk=%d max_chunk=%d target=%lf "
914 tcp,
tcp->min_read_chunk_size,
tcp->max_read_chunk_size,
915 tcp->target_length,
tcp->incoming_buffer->length);
918 std::max(
static_cast<int>(
tcp->target_length),
tcp->min_progress_size);
920 target_length -
static_cast<int>(
tcp->incoming_buffer->length);
921 int min_read_chunk_size =
923 int max_read_chunk_size =
926 tcp->incoming_buffer,
930 max_read_chunk_size))));
936 grpc_tcp*
tcp =
static_cast<grpc_tcp*
>(
arg);
944 maybe_make_read_slices(
tcp);
945 if (!tcp_do_read(
tcp, &tcp_read_error)) {
947 tcp->read_mu.Unlock();
951 tcp_trace_read(
tcp, tcp_read_error);
958 tcp->read_cb =
nullptr;
959 tcp->incoming_buffer =
nullptr;
960 tcp->read_mu.Unlock();
962 TCP_UNREF(
tcp,
"read");
967 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
971 tcp->incoming_buffer = incoming_buffer;
972 tcp->min_progress_size =
973 tcp->frame_size_tuning_enabled ? min_progress_size : 1;
976 tcp->read_mu.Unlock();
977 TCP_REF(
tcp,
"read");
978 if (
tcp->is_first_read) {
981 tcp->is_first_read =
false;
983 }
else if (!urgent &&
tcp->inq == 0) {
1001 ssize_t tcp_send(
int fd,
const struct msghdr*
msg,
int* saved_errno,
1002 int additional_flags = 0) {
1008 sent_length = sendmsg(fd,
msg, SENDMSG_FLAGS | additional_flags);
1009 }
while (sent_length < 0 && (*saved_errno = errno) == EINTR);
1019 static bool tcp_write_with_timestamps(grpc_tcp*
tcp,
struct msghdr*
msg,
1020 size_t sending_length,
1021 ssize_t* sent_length,
int* saved_errno,
1022 int additional_flags = 0);
1027 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1030 #ifdef GRPC_LINUX_ERRQUEUE
1031 static bool process_errors(grpc_tcp*
tcp);
1033 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1035 TcpZerocopySendRecord* zerocopy_send_record =
nullptr;
1036 const bool use_zerocopy =
1037 tcp->tcp_zerocopy_send_ctx.enabled() &&
1038 tcp->tcp_zerocopy_send_ctx.threshold_bytes() <
buf->length;
1040 zerocopy_send_record =
tcp->tcp_zerocopy_send_ctx.GetSendRecord();
1041 if (zerocopy_send_record ==
nullptr) {
1042 process_errors(
tcp);
1043 zerocopy_send_record =
tcp->tcp_zerocopy_send_ctx.GetSendRecord();
1045 if (zerocopy_send_record !=
nullptr) {
1046 zerocopy_send_record->PrepareForSends(
buf);
1049 tcp->outgoing_byte_idx = 0;
1050 tcp->outgoing_buffer =
nullptr;
1053 return zerocopy_send_record;
1056 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp*
tcp) {
1057 tcp->tcp_zerocopy_send_ctx.Shutdown();
1058 while (!
tcp->tcp_zerocopy_send_ctx.AllSendRecordsEmpty()) {
1059 process_errors(
tcp);
1063 static bool tcp_write_with_timestamps(grpc_tcp*
tcp,
struct msghdr*
msg,
1064 size_t sending_length,
1065 ssize_t* sent_length,
int* saved_errno,
1066 int additional_flags) {
1067 if (!
tcp->socket_ts_enabled) {
1068 uint32_t opt = grpc_core::kTimestampingSocketOptions;
1069 if (setsockopt(
tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
1070 static_cast<void*
>(&opt),
sizeof(opt)) != 0) {
1076 tcp->bytes_counter = -1;
1077 tcp->socket_ts_enabled =
true;
1081 char cmsg_buf[CMSG_SPACE(
sizeof(
uint32_t))];
1082 struct cmsghdr
align;
1084 cmsghdr* cmsg =
reinterpret_cast<cmsghdr*
>(
u.cmsg_buf);
1085 cmsg->cmsg_level = SOL_SOCKET;
1086 cmsg->cmsg_type = SO_TIMESTAMPING;
1087 cmsg->cmsg_len = CMSG_LEN(
sizeof(
uint32_t));
1088 *
reinterpret_cast<int*
>(CMSG_DATA(cmsg)) =
1089 grpc_core::kTimestampingRecordingOptions;
1090 msg->msg_control =
u.cmsg_buf;
1091 msg->msg_controllen = CMSG_SPACE(
sizeof(
uint32_t));
1097 if (sending_length ==
static_cast<size_t>(
length)) {
1099 grpc_core::TracedBuffer::AddNewEntry(
1101 tcp->fd,
tcp->outgoing_buffer_arg);
1103 tcp->outgoing_buffer_arg =
nullptr;
1108 static void UnrefMaybePutZerocopySendRecord(grpc_tcp*
tcp,
1109 TcpZerocopySendRecord* record,
1112 static void process_zerocopy(grpc_tcp*
tcp,
struct cmsghdr* cmsg) {
1114 auto serr =
reinterpret_cast<struct sock_extended_err*
>(CMSG_DATA(cmsg));
1119 for (
uint32_t seq = lo; seq <= hi; ++seq) {
1124 TcpZerocopySendRecord* record =
1125 tcp->tcp_zerocopy_send_ctx.ReleaseSendRecord(seq);
1127 UnrefMaybePutZerocopySendRecord(
tcp, record, seq,
"CALLBACK RCVD");
1132 static bool CmsgIsIpLevel(
const cmsghdr& cmsg) {
1133 return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
1134 (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
1137 static bool CmsgIsZeroCopy(
const cmsghdr& cmsg) {
1138 if (!CmsgIsIpLevel(cmsg)) {
1141 auto serr =
reinterpret_cast<const sock_extended_err*
> CMSG_DATA(&cmsg);
1142 return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
1152 struct cmsghdr* process_timestamp(grpc_tcp*
tcp, msghdr*
msg,
1153 struct cmsghdr* cmsg) {
1154 auto next_cmsg = CMSG_NXTHDR(
msg, cmsg);
1155 cmsghdr* opt_stats =
nullptr;
1156 if (next_cmsg ==
nullptr) {
1164 if (next_cmsg->cmsg_level == SOL_SOCKET &&
1165 next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
1166 opt_stats = next_cmsg;
1167 next_cmsg = CMSG_NXTHDR(
msg, opt_stats);
1168 if (next_cmsg ==
nullptr) {
1176 if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
1177 !(next_cmsg->cmsg_type == IP_RECVERR ||
1178 next_cmsg->cmsg_type == IPV6_RECVERR)) {
1186 reinterpret_cast<struct grpc_core::scm_timestamping*
>(CMSG_DATA(cmsg));
1187 auto serr =
reinterpret_cast<struct sock_extended_err*
>(CMSG_DATA(next_cmsg));
1188 if (serr->ee_errno != ENOMSG ||
1189 serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
1197 grpc_core::TracedBuffer::ProcessTimestamp(&
tcp->tb_head, serr, opt_stats,
1206 static bool process_errors(grpc_tcp*
tcp) {
1207 bool processed_err =
false;
1209 iov.iov_base =
nullptr;
1212 msg.msg_name =
nullptr;
1213 msg.msg_namelen = 0;
1219 constexpr
size_t cmsg_alloc_space =
1220 CMSG_SPACE(
sizeof(grpc_core::scm_timestamping)) +
1221 CMSG_SPACE(
sizeof(sock_extended_err) +
sizeof(sockaddr_in)) +
1222 CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN +
sizeof(
uint64_t)));
1225 char rbuf[cmsg_alloc_space];
1226 struct cmsghdr
align;
1228 msg.msg_control = aligned_buf.rbuf;
1231 msg.msg_controllen =
sizeof(aligned_buf.rbuf);
1233 r = recvmsg(
tcp->fd, &
msg, MSG_ERRQUEUE);
1234 saved_errno = errno;
1235 }
while (
r < 0 && saved_errno == EINTR);
1237 if (
r == -1 && saved_errno == EAGAIN) {
1238 return processed_err;
1241 return processed_err;
1247 if (
msg.msg_controllen == 0) {
1249 return processed_err;
1252 for (
auto cmsg = CMSG_FIRSTHDR(&
msg); cmsg && cmsg->cmsg_len;
1253 cmsg = CMSG_NXTHDR(&
msg, cmsg)) {
1254 if (CmsgIsZeroCopy(*cmsg)) {
1255 process_zerocopy(
tcp, cmsg);
1257 processed_err =
true;
1258 }
else if (cmsg->cmsg_level == SOL_SOCKET &&
1259 cmsg->cmsg_type == SCM_TIMESTAMPING) {
1260 cmsg = process_timestamp(
tcp, &
msg, cmsg);
1262 processed_err =
true;
1268 "unknown control message cmsg_level:%d cmsg_type:%d",
1269 cmsg->cmsg_level, cmsg->cmsg_type);
1271 return processed_err;
1275 return processed_err;
1280 static void tcp_handle_error(
void*
arg ,
1282 grpc_tcp*
tcp =
static_cast<grpc_tcp*
>(
arg);
1292 TCP_UNREF(
tcp,
"error-tracking");
1298 bool processed = process_errors(
tcp);
1309 static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
1314 static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* ) {}
1316 static bool tcp_write_with_timestamps(grpc_tcp* ,
struct msghdr* ,
1321 gpr_log(
GPR_ERROR,
"Write with timestamps not supported for this platform");
1326 static void tcp_handle_error(
void* ,
1335 void tcp_shutdown_buffer_list(grpc_tcp*
tcp) {
1336 if (
tcp->outgoing_buffer_arg) {
1339 &
tcp->tb_head,
tcp->outgoing_buffer_arg,
1342 tcp->outgoing_buffer_arg =
nullptr;
1346 #if defined(IOV_MAX) && IOV_MAX < 260
1347 #define MAX_WRITE_IOVEC IOV_MAX
1349 #define MAX_WRITE_IOVEC 260
1351 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(
size_t* unwind_slice_idx,
1352 size_t* unwind_byte_idx,
1353 size_t* sending_length,
1355 msg_iovlen_type iov_size;
1356 *unwind_slice_idx = out_offset_.slice_idx;
1357 *unwind_byte_idx = out_offset_.byte_idx;
1359 out_offset_.slice_idx !=
buf_.count && iov_size != MAX_WRITE_IOVEC;
1361 iov[iov_size].iov_base =
1363 out_offset_.byte_idx;
1364 iov[iov_size].iov_len =
1366 out_offset_.byte_idx;
1367 *sending_length +=
iov[iov_size].iov_len;
1368 ++(out_offset_.slice_idx);
1369 out_offset_.byte_idx = 0;
1375 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(
size_t sending_length,
1376 size_t actually_sent) {
1377 size_t trailing = sending_length - actually_sent;
1379 size_t slice_length;
1380 out_offset_.slice_idx--;
1383 out_offset_.byte_idx = slice_length -
trailing;
1392 static bool do_tcp_flush_zerocopy(grpc_tcp*
tcp, TcpZerocopySendRecord* record,
1394 msg_iovlen_type iov_size;
1396 size_t sending_length;
1397 size_t unwind_slice_idx;
1398 size_t unwind_byte_idx;
1399 bool tried_sending_message;
1408 iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
1409 &sending_length,
iov);
1410 msg.msg_name =
nullptr;
1411 msg.msg_namelen = 0;
1413 msg.msg_iovlen = iov_size;
1415 tried_sending_message =
false;
1418 tcp->tcp_zerocopy_send_ctx.NoteSend(record);
1420 if (
tcp->outgoing_buffer_arg !=
nullptr) {
1421 if (!
tcp->ts_capable ||
1422 !tcp_write_with_timestamps(
tcp, &
msg, sending_length, &sent_length,
1423 &saved_errno, MSG_ZEROCOPY)) {
1426 tcp->ts_capable =
false;
1427 tcp_shutdown_buffer_list(
tcp);
1429 tried_sending_message =
true;
1432 if (!tried_sending_message) {
1433 msg.msg_control =
nullptr;
1434 msg.msg_controllen = 0;
1437 sent_length = tcp_send(
tcp->fd, &
msg, &saved_errno, MSG_ZEROCOPY);
1439 if (sent_length < 0) {
1441 tcp->tcp_zerocopy_send_ctx.UndoSend();
1442 if (saved_errno == EAGAIN) {
1443 record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
1445 }
else if (saved_errno == EPIPE) {
1447 tcp_shutdown_buffer_list(
tcp);
1451 tcp_shutdown_buffer_list(
tcp);
1455 tcp->bytes_counter += sent_length;
1456 record->UpdateOffsetForBytesSent(sending_length,
1457 static_cast<size_t>(sent_length));
1458 if (record->AllSlicesSent()) {
1465 static void UnrefMaybePutZerocopySendRecord(grpc_tcp*
tcp,
1466 TcpZerocopySendRecord* record,
1469 if (record->Unref()) {
1470 tcp->tcp_zerocopy_send_ctx.PutSendRecord(record);
1474 static bool tcp_flush_zerocopy(grpc_tcp*
tcp, TcpZerocopySendRecord* record,
1480 UnrefMaybePutZerocopySendRecord(
tcp, record, 0,
"flush_done");
1488 msg_iovlen_type iov_size;
1490 size_t sending_length;
1492 size_t unwind_slice_idx;
1493 size_t unwind_byte_idx;
1498 size_t outgoing_slice_idx = 0;
1502 unwind_slice_idx = outgoing_slice_idx;
1503 unwind_byte_idx =
tcp->outgoing_byte_idx;
1504 for (iov_size = 0; outgoing_slice_idx !=
tcp->outgoing_buffer->count &&
1505 iov_size != MAX_WRITE_IOVEC;
1507 iov[iov_size].iov_base =
1509 tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
1510 tcp->outgoing_byte_idx;
1511 iov[iov_size].iov_len =
1513 tcp->outgoing_byte_idx;
1514 sending_length +=
iov[iov_size].iov_len;
1515 outgoing_slice_idx++;
1516 tcp->outgoing_byte_idx = 0;
1520 msg.msg_name =
nullptr;
1521 msg.msg_namelen = 0;
1523 msg.msg_iovlen = iov_size;
1525 bool tried_sending_message =
false;
1527 if (
tcp->outgoing_buffer_arg !=
nullptr) {
1528 if (!
tcp->ts_capable ||
1529 !tcp_write_with_timestamps(
tcp, &
msg, sending_length, &sent_length,
1533 tcp->ts_capable =
false;
1534 tcp_shutdown_buffer_list(
tcp);
1536 tried_sending_message =
true;
1539 if (!tried_sending_message) {
1540 msg.msg_control =
nullptr;
1541 msg.msg_controllen = 0;
1546 sent_length = tcp_send(
tcp->fd, &
msg, &saved_errno);
1549 if (sent_length < 0) {
1550 if (saved_errno == EAGAIN) {
1551 tcp->outgoing_byte_idx = unwind_byte_idx;
1554 for (
size_t idx = 0;
idx < unwind_slice_idx; ++
idx) {
1558 }
else if (saved_errno == EPIPE) {
1561 tcp_shutdown_buffer_list(
tcp);
1566 tcp_shutdown_buffer_list(
tcp);
1572 tcp->bytes_counter += sent_length;
1573 trailing = sending_length -
static_cast<size_t>(sent_length);
1575 size_t slice_length;
1577 outgoing_slice_idx--;
1587 if (outgoing_slice_idx ==
tcp->outgoing_buffer->count) {
1595 static void tcp_handle_write(
void*
arg ,
1597 grpc_tcp*
tcp =
static_cast<grpc_tcp*
>(
arg);
1602 tcp->write_cb =
nullptr;
1603 if (
tcp->current_zerocopy_send !=
nullptr) {
1604 UnrefMaybePutZerocopySendRecord(
tcp,
tcp->current_zerocopy_send, 0,
1605 "handle_write_err");
1606 tcp->current_zerocopy_send =
nullptr;
1609 TCP_UNREF(
tcp,
"write");
1614 tcp->current_zerocopy_send !=
nullptr
1615 ? tcp_flush_zerocopy(
tcp,
tcp->current_zerocopy_send, &
error)
1617 if (!flush_result) {
1621 notify_on_write(
tcp);
1626 tcp->write_cb =
nullptr;
1627 tcp->current_zerocopy_send =
nullptr;
1633 TCP_UNREF(
tcp,
"write");
1640 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1642 TcpZerocopySendRecord* zerocopy_send_record =
nullptr;
1647 for (
i = 0;
i <
buf->count;
i++) {
1661 if (
buf->length == 0) {
1668 tcp_shutdown_buffer_list(
tcp);
1672 zerocopy_send_record = tcp_get_send_zerocopy_record(
tcp,
buf);
1673 if (zerocopy_send_record ==
nullptr) {
1675 tcp->outgoing_buffer =
buf;
1676 tcp->outgoing_byte_idx = 0;
1678 tcp->outgoing_buffer_arg =
arg;
1684 zerocopy_send_record !=
nullptr
1685 ? tcp_flush_zerocopy(
tcp, zerocopy_send_record, &
error)
1687 if (!flush_result) {
1688 TCP_REF(
tcp,
"write");
1690 tcp->current_zerocopy_send = zerocopy_send_record;
1694 notify_on_write(
tcp);
1704 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1710 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1716 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1721 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1722 return tcp->peer_string;
1726 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1727 return tcp->local_address;
1731 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1736 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1740 struct sockaddr
addr;
1751 tcp_add_to_pollset_set,
1752 tcp_delete_from_pollset_set,
1756 tcp_get_local_address,
1760 #define MAX_CHUNK_SIZE (32 * 1024 * 1024)
1765 static constexpr
bool kZerocpTxEnabledDefault =
false;
1767 int tcp_max_read_chunk_size = 4 * 1024 * 1024;
1768 int tcp_min_read_chunk_size = 256;
1769 bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault;
1770 int tcp_tx_zerocopy_send_bytes_thresh =
1771 grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold;
1772 int tcp_tx_zerocopy_max_simult_sends =
1773 grpc_core::TcpZerocopySendCtx::kDefaultMaxSends;
1774 if (channel_args !=
nullptr) {
1775 for (
size_t i = 0;
i < channel_args->
num_args;
i++) {
1779 tcp_read_chunk_size =
1781 }
else if (0 == strcmp(channel_args->
args[
i].
key,
1784 tcp_min_read_chunk_size =
1786 }
else if (0 == strcmp(channel_args->
args[
i].
key,
1789 tcp_max_read_chunk_size =
1791 }
else if (0 == strcmp(channel_args->
args[
i].
key,
1794 &channel_args->
args[
i], kZerocpTxEnabledDefault);
1795 }
else if (0 == strcmp(channel_args->
args[
i].
key,
1798 grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0,
1800 tcp_tx_zerocopy_send_bytes_thresh =
1802 }
else if (0 == strcmp(channel_args->
args[
i].
key,
1805 grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX};
1806 tcp_tx_zerocopy_max_simult_sends =
1812 if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
1813 tcp_min_read_chunk_size = tcp_max_read_chunk_size;
1816 tcp_read_chunk_size, tcp_min_read_chunk_size, tcp_max_read_chunk_size);
1818 grpc_tcp*
tcp =
new grpc_tcp(tcp_tx_zerocopy_max_simult_sends,
1819 tcp_tx_zerocopy_send_bytes_thresh);
1825 ->CreateMemoryOwner(peer_string);
1826 tcp->self_reservation =
tcp->memory_owner.MakeReservation(
sizeof(grpc_tcp));
1828 memset(&resolved_local_addr, 0,
sizeof(resolved_local_addr));
1829 resolved_local_addr.
len =
sizeof(resolved_local_addr.
addr);
1831 if (getsockname(
tcp->fd,
1832 reinterpret_cast<sockaddr*
>(resolved_local_addr.
addr),
1833 &resolved_local_addr.
len) < 0 ||
1835 tcp->local_address =
"";
1837 tcp->local_address = addr_uri.
value();
1839 tcp->read_cb =
nullptr;
1840 tcp->write_cb =
nullptr;
1841 tcp->current_zerocopy_send =
nullptr;
1842 tcp->release_fd_cb =
nullptr;
1843 tcp->release_fd =
nullptr;
1844 tcp->target_length =
static_cast<double>(tcp_read_chunk_size);
1845 tcp->min_read_chunk_size = tcp_min_read_chunk_size;
1846 tcp->max_read_chunk_size = tcp_max_read_chunk_size;
1847 tcp->bytes_read_this_round = 0;
1849 tcp->is_first_read =
true;
1850 tcp->has_posted_reclaimer =
false;
1851 tcp->bytes_counter = -1;
1852 tcp->socket_ts_enabled =
false;
1853 tcp->ts_capable =
true;
1854 tcp->outgoing_buffer_arg =
nullptr;
1855 tcp->frame_size_tuning_enabled = ExperimentalTcpFrameSizeTuningEnabled();
1856 tcp->min_progress_size = 1;
1857 if (tcp_tx_zerocopy_enabled && !
tcp->tcp_zerocopy_send_ctx.memory_limited()) {
1858 #ifdef GRPC_LINUX_ERRQUEUE
1859 const int enable = 1;
1861 setsockopt(
tcp->fd, SOL_SOCKET, SO_ZEROCOPY, &enable,
sizeof(enable));
1863 tcp->tcp_zerocopy_send_ctx.set_enabled(
true);
1876 tcp->tb_head =
nullptr;
1878 grpc_schedule_on_exec_ctx);
1883 grpc_schedule_on_exec_ctx);
1886 tcp_drop_uncovered_then_handle_write,
tcp,
1887 grpc_schedule_on_exec_ctx);
1891 #ifdef GRPC_HAVE_TCP_INQ
1893 if (setsockopt(
tcp->fd, SOL_TCP, TCP_INQ, &one,
sizeof(one)) == 0) {
1894 tcp->inq_capable =
true;
1897 tcp->inq_capable =
false;
1900 tcp->inq_capable =
false;
1907 TCP_REF(
tcp,
"error-tracking");
1910 grpc_schedule_on_exec_ctx);
1918 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1925 grpc_tcp*
tcp =
reinterpret_cast<grpc_tcp*
>(ep);
1927 tcp->release_fd = fd;
1932 ZerocopyDisableAndWaitForRemaining(
tcp);
1936 TCP_UNREF(
tcp,
"destroy");
1941 void grpc_tcp_posix_shutdown() {
1942 delete g_backup_poller_mu;
1943 g_backup_poller_mu =
nullptr;