31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 #include "absl/utility/utility.h"
72 #define INPROC_LOG(...) \
74 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
75 gpr_log(__VA_ARGS__); \
88 uint32_t* outflags,
bool* markfilled);
107 struct inproc_transport {
111 is_client(is_client),
112 state_tracker(is_client ?
"inproc_client" :
"inproc_server",
120 ~inproc_transport() {
138 this->~inproc_transport();
148 const void* server_data);
149 void* accept_stream_data;
150 bool is_closed =
false;
151 struct inproc_transport* other_side;
152 struct inproc_stream* stream_list =
nullptr;
155 struct inproc_stream {
160 ref(
"inproc_init_stream:init");
161 ref(
"inproc_init_stream:list");
163 stream_list_prev =
nullptr;
165 stream_list_next = t->stream_list;
166 if (t->stream_list) {
167 t->stream_list->stream_list_prev =
this;
169 t->stream_list =
this;
174 inproc_transport* st = t->other_side;
176 other_side =
nullptr;
178 ref(
"inproc_init_stream:clt");
181 st->accept_stream_cb, st->accept_stream_data);
182 (*st->accept_stream_cb)(st->accept_stream_data, &st->base,
this);
185 inproc_stream*
cs =
const_cast<inproc_stream*
>(
186 static_cast<const inproc_stream*
>(server_data));
189 ref(
"inproc_init_stream:srv");
194 cs->other_side =
this;
197 if (
cs->write_buffer_initial_md_filled) {
198 (void)fill_in_metadata(
this, &
cs->write_buffer_initial_md,
199 cs->write_buffer_initial_md_flags,
200 &to_read_initial_md, &to_read_initial_md_flags,
201 &to_read_initial_md_filled);
202 deadline =
std::min(deadline,
cs->write_buffer_deadline);
203 cs->write_buffer_initial_md.Clear();
204 cs->write_buffer_initial_md_filled =
false;
206 if (
cs->write_buffer_trailing_md_filled) {
207 (void)fill_in_metadata(
this, &
cs->write_buffer_trailing_md, 0,
208 &to_read_trailing_md,
nullptr,
209 &to_read_trailing_md_filled);
210 cs->write_buffer_trailing_md.Clear();
211 cs->write_buffer_trailing_md_filled =
false;
214 cancel_other_error =
cs->write_buffer_cancel_error;
216 maybe_process_ops_locked(
this, cancel_other_error);
232 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
233 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
235 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
236 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
238 void ref(
const char* reason) {
243 void unref(
const char* reason) {
255 uint32_t to_read_initial_md_flags = 0;
256 bool to_read_initial_md_filled =
false;
258 bool to_read_trailing_md_filled =
false;
259 bool ops_needed =
false;
263 bool write_buffer_initial_md_filled =
false;
264 uint32_t write_buffer_initial_md_flags = 0;
268 bool write_buffer_trailing_md_filled =
false;
271 struct inproc_stream* other_side;
272 bool other_side_closed =
false;
273 bool write_buffer_other_side_closed =
false;
281 bool initial_md_sent =
false;
282 bool trailing_md_sent =
false;
283 bool initial_md_recvd =
false;
284 bool trailing_md_recvd =
false;
289 bool trailing_md_recvd_implicit_only =
false;
299 struct inproc_stream* stream_list_prev;
300 struct inproc_stream* stream_list_next;
306 "INPROC:", is_initial ?
"HDR:" :
"TRL:", is_client ?
"CLI:" :
"SVR:");
319 dst_->Append(
key.as_string_view(),
value.AsOwned(),
323 template <
class T,
class V>
324 void Encode(
T trait, V
value) {
325 dst_->Set(trait,
value);
330 dst_->Set(trait,
value.AsOwned());
341 uint32_t* outflags,
bool* markfilled) {
346 if (outflags !=
nullptr) {
349 if (markfilled !=
nullptr) {
357 CopySink
sink(out_md);
365 inproc_transport*
t =
reinterpret_cast<inproc_transport*
>(gt);
370 void close_stream_locked(inproc_stream* s) {
373 s->write_buffer_initial_md.Clear();
374 s->write_buffer_trailing_md.Clear();
377 inproc_stream*
p =
s->stream_list_prev;
378 inproc_stream*
n =
s->stream_list_next;
380 p->stream_list_next =
n;
382 s->t->stream_list =
n;
385 n->stream_list_prev =
p;
388 s->unref(
"close_stream:list");
391 s->unref(
"close_stream:closing");
396 void close_other_side_locked(inproc_stream* s,
const char* reason) {
397 if (
s->other_side !=
nullptr) {
399 s->to_read_initial_md.Clear();
400 s->to_read_trailing_md.Clear();
402 s->other_side->unref(reason);
403 s->other_side_closed =
true;
404 s->other_side =
nullptr;
405 }
else if (!
s->other_side_closed) {
406 s->write_buffer_other_side_closed =
true;
417 int is_sm =
static_cast<int>(
op ==
s->send_message_op);
418 int is_stm =
static_cast<int>(
op ==
s->send_trailing_md_op);
423 int is_rim =
static_cast<int>(
op ==
s->recv_initial_md_op);
424 int is_rm =
static_cast<int>(
op ==
s->recv_message_op);
425 int is_rtm =
static_cast<int>(
op ==
s->recv_trailing_md_op);
427 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
437 s->ops_needed =
false;
438 op_state_machine_locked(s,
error);
446 if (!
s->trailing_md_sent) {
448 s->trailing_md_sent =
true;
451 inproc_stream* other =
s->other_side;
453 ? &
s->write_buffer_trailing_md
454 : &other->to_read_trailing_md;
455 bool* destfilled = (other ==
nullptr) ? &
s->write_buffer_trailing_md_filled
456 : &other->to_read_trailing_md_filled;
457 (void)fill_in_metadata(s, &fake_md, 0,
dest,
nullptr, destfilled);
459 if (other !=
nullptr) {
463 maybe_process_ops_locked(other,
error);
468 if (
s->recv_initial_md_op) {
470 if (!
s->t->is_client) {
479 (void)fill_in_metadata(
481 s->recv_initial_md_op->payload->recv_initial_metadata
482 .recv_initial_metadata,
483 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
489 if (
s->recv_initial_md_op->payload->recv_initial_metadata
490 .trailing_metadata_available !=
nullptr) {
494 *
s->recv_initial_md_op->payload->recv_initial_metadata
495 .trailing_metadata_available =
true;
498 "fail_helper %p scheduling initial-metadata-ready %s %s", s,
503 s->recv_initial_md_op->payload->recv_initial_metadata
504 .recv_initial_metadata_ready,
508 complete_if_batch_end_locked(
509 s,
error,
s->recv_initial_md_op,
510 "fail_helper scheduling recv-initial-metadata-on-complete");
511 s->recv_initial_md_op =
nullptr;
513 if (
s->recv_message_op) {
516 if (
s->recv_message_op->payload->recv_message
517 .call_failed_before_recv_message !=
nullptr) {
518 *
s->recv_message_op->payload->recv_message
519 .call_failed_before_recv_message =
true;
523 s->recv_message_op->payload->recv_message.recv_message_ready,
525 complete_if_batch_end_locked(
526 s,
error,
s->recv_message_op,
527 "fail_helper scheduling recv-message-on-complete");
528 s->recv_message_op =
nullptr;
530 if (
s->send_message_op) {
531 ResetSendMessage(
s->send_message_op);
532 complete_if_batch_end_locked(
533 s,
error,
s->send_message_op,
534 "fail_helper scheduling send-message-on-complete");
535 s->send_message_op =
nullptr;
537 if (
s->send_trailing_md_op) {
538 complete_if_batch_end_locked(
539 s,
error,
s->send_trailing_md_op,
540 "fail_helper scheduling send-trailng-md-on-complete");
541 s->send_trailing_md_op =
nullptr;
543 if (
s->recv_trailing_md_op) {
548 s->recv_trailing_md_op->payload->recv_trailing_metadata
549 .recv_trailing_metadata_ready,
553 complete_if_batch_end_locked(
554 s,
error,
s->recv_trailing_md_op,
555 "fail_helper scheduling recv-trailing-metadata-on-complete");
556 s->recv_trailing_md_op =
nullptr;
558 close_other_side_locked(s,
"fail_helper:other_side");
559 close_stream_locked(s);
572 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
573 *receiver->recv_message_op->payload->recv_message.recv_message =
574 std::move(*sender->send_message_op->payload->send_message.send_message);
575 *receiver->recv_message_op->payload->recv_message.flags =
576 sender->send_message_op->payload->send_message.flags;
582 receiver->recv_message_op->payload->recv_message.recv_message_ready,
584 complete_if_batch_end_locked(
586 "message_transfer scheduling sender on_complete");
587 complete_if_batch_end_locked(
589 "message_transfer scheduling receiver on_complete");
591 receiver->recv_message_op =
nullptr;
592 sender->send_message_op =
nullptr;
603 bool needs_close =
false;
607 inproc_stream* other =
s->other_side;
620 if (
s->send_message_op && other) {
621 if (other->recv_message_op) {
622 message_transfer_locked(s, other);
624 }
else if (!
s->t->is_client &&
s->trailing_md_sent) {
626 ResetSendMessage(
s->send_message_op);
627 complete_if_batch_end_locked(
629 "op_state_machine scheduling send-message-on-complete case 1");
630 s->send_message_op =
nullptr;
638 if (
s->send_trailing_md_op &&
639 (!
s->send_message_op ||
641 (
s->trailing_md_recvd ||
s->to_read_trailing_md_filled)) ||
642 (!
s->t->is_client && other &&
643 (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
644 other->recv_trailing_md_op)))) {
646 ? &
s->write_buffer_trailing_md
647 : &other->to_read_trailing_md;
648 bool* destfilled = (other ==
nullptr) ? &
s->write_buffer_trailing_md_filled
649 : &other->to_read_trailing_md_filled;
650 if (*destfilled ||
s->trailing_md_sent) {
657 if (!other || !other->closed) {
658 (void)fill_in_metadata(
660 s->send_trailing_md_op->payload->send_trailing_metadata
661 .send_trailing_metadata,
662 0,
dest,
nullptr, destfilled);
664 s->trailing_md_sent =
true;
665 if (
s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
666 *
s->send_trailing_md_op->payload->send_trailing_metadata.sent =
true;
668 if (!
s->t->is_client &&
s->trailing_md_recvd &&
s->recv_trailing_md_op) {
670 "op_state_machine %p scheduling trailing-metadata-ready", s);
673 s->recv_trailing_md_op->payload->recv_trailing_metadata
674 .recv_trailing_metadata_ready,
677 "op_state_machine %p scheduling trailing-md-on-complete", s);
679 s->recv_trailing_md_op->on_complete,
681 s->recv_trailing_md_op =
nullptr;
686 complete_if_batch_end_locked(
688 "op_state_machine scheduling send-trailing-metadata-on-complete");
689 s->send_trailing_md_op =
nullptr;
691 if (
s->recv_initial_md_op) {
692 if (
s->initial_md_recvd) {
697 "op_state_machine %p scheduling on_complete errors for already "
698 "recvd initial md %s",
704 if (
s->to_read_initial_md_filled) {
705 s->initial_md_recvd =
true;
707 s, &
s->to_read_initial_md,
s->to_read_initial_md_flags,
708 s->recv_initial_md_op->payload->recv_initial_metadata
709 .recv_initial_metadata,
710 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
713 s->recv_initial_md_op->payload->recv_initial_metadata
717 if (
s->recv_initial_md_op->payload->recv_initial_metadata
718 .trailing_metadata_available !=
nullptr) {
719 *
s->recv_initial_md_op->payload->recv_initial_metadata
720 .trailing_metadata_available =
721 (other !=
nullptr && other->send_trailing_md_op !=
nullptr);
723 s->to_read_initial_md.Clear();
724 s->to_read_initial_md_filled =
false;
727 s->recv_initial_md_op->payload->recv_initial_metadata
728 .recv_initial_metadata_ready,
730 complete_if_batch_end_locked(
732 "op_state_machine scheduling recv-initial-metadata-on-complete");
733 s->recv_initial_md_op =
nullptr;
736 if (
s->recv_message_op) {
737 if (other && other->send_message_op) {
738 message_transfer_locked(other, s);
742 if (
s->to_read_trailing_md_filled) {
743 if (
s->trailing_md_recvd) {
744 if (
s->trailing_md_recvd_implicit_only) {
746 "op_state_machine %p already implicitly received trailing "
747 "metadata, so ignoring new trailing metadata from client",
749 s->to_read_trailing_md.Clear();
750 s->to_read_trailing_md_filled =
false;
751 s->trailing_md_recvd_implicit_only =
false;
757 "op_state_machine %p scheduling on_complete errors for already "
758 "recvd trailing md %s",
764 if (
s->recv_message_op !=
nullptr) {
767 s->recv_message_op->payload->recv_message.recv_message->reset();
771 s->recv_message_op->payload->recv_message.recv_message_ready,
773 complete_if_batch_end_locked(
774 s, new_err,
s->recv_message_op,
775 "op_state_machine scheduling recv-message-on-complete");
776 s->recv_message_op =
nullptr;
778 if ((
s->trailing_md_sent ||
s->t->is_client) &&
s->send_message_op) {
781 ResetSendMessage(
s->send_message_op);
782 s->send_message_op->payload->send_message.stream_write_closed =
true;
783 complete_if_batch_end_locked(
784 s, new_err,
s->send_message_op,
785 "op_state_machine scheduling send-message-on-complete case 2");
786 s->send_message_op =
nullptr;
788 if (
s->recv_trailing_md_op !=
nullptr) {
790 s->trailing_md_recvd =
true;
791 fill_in_metadata(s, &
s->to_read_trailing_md, 0,
792 s->recv_trailing_md_op->payload->recv_trailing_metadata
793 .recv_trailing_metadata,
795 s->to_read_trailing_md.Clear();
796 s->to_read_trailing_md_filled =
false;
804 if (
s->t->is_client ||
s->trailing_md_sent) {
807 s->recv_trailing_md_op->payload->recv_trailing_metadata
808 .recv_trailing_metadata_ready,
811 s->recv_trailing_md_op->on_complete,
813 s->recv_trailing_md_op =
nullptr;
814 needs_close =
s->trailing_md_sent;
816 }
else if (!
s->trailing_md_recvd) {
819 "op_state_machine %p has trailing md but not yet waiting for it", s);
822 if (!
s->t->is_client &&
s->trailing_md_sent &&
823 (
s->recv_trailing_md_op !=
nullptr)) {
831 s->recv_trailing_md_op->payload->recv_trailing_metadata
832 .recv_trailing_metadata_ready,
834 complete_if_batch_end_locked(
835 s, new_err,
s->recv_trailing_md_op,
836 "op_state_machine scheduling recv-trailing-md-on-complete");
837 s->trailing_md_recvd =
true;
838 s->recv_trailing_md_op =
nullptr;
841 s->trailing_md_recvd_implicit_only =
true;
843 if (
s->trailing_md_recvd &&
s->recv_message_op) {
847 s->recv_message_op->payload->recv_message.recv_message->reset();
850 s->recv_message_op->payload->recv_message.recv_message_ready,
852 complete_if_batch_end_locked(
853 s, new_err,
s->recv_message_op,
854 "op_state_machine scheduling recv-message-on-complete");
855 s->recv_message_op =
nullptr;
857 if (
s->trailing_md_recvd &&
s->send_message_op &&
s->t->is_client) {
860 ResetSendMessage(
s->send_message_op);
861 complete_if_batch_end_locked(
862 s, new_err,
s->send_message_op,
863 "op_state_machine scheduling send-message-on-complete case 3");
864 s->send_message_op =
nullptr;
866 if (
s->send_message_op ||
s->send_trailing_md_op ||
s->recv_initial_md_op ||
867 s->recv_message_op ||
s->recv_trailing_md_op) {
871 GPR_INFO,
"op_state_machine %p still needs closure %p %p %p %p %p", s,
872 s->send_message_op,
s->send_trailing_md_op,
s->recv_initial_md_op,
873 s->recv_message_op,
s->recv_trailing_md_op);
874 s->ops_needed =
true;
878 close_other_side_locked(s,
"op_state_machine");
879 close_stream_locked(s);
892 inproc_stream* other =
s->other_side;
893 maybe_process_ops_locked(s,
s->cancel_self_error);
896 s->trailing_md_sent =
true;
901 ? &
s->write_buffer_trailing_md
902 : &other->to_read_trailing_md;
903 bool* destfilled = (other ==
nullptr) ? &
s->write_buffer_trailing_md_filled
904 : &other->to_read_trailing_md_filled;
905 (void)fill_in_metadata(s, &cancel_md, 0,
dest,
nullptr, destfilled);
907 if (other !=
nullptr) {
911 maybe_process_ops_locked(other, other->cancel_other_error);
919 if (!
s->t->is_client &&
s->trailing_md_recvd &&
s->recv_trailing_md_op) {
922 s->recv_trailing_md_op->payload->recv_trailing_metadata
923 .recv_trailing_metadata_ready,
925 complete_if_batch_end_locked(
926 s,
s->cancel_self_error,
s->recv_trailing_md_op,
927 "cancel_stream scheduling trailing-md-on-complete");
928 s->recv_trailing_md_op =
nullptr;
932 close_other_side_locked(s,
"cancel_stream:other_side");
933 close_stream_locked(s);
944 inproc_stream*
s =
reinterpret_cast<inproc_stream*
>(gs);
949 if (
op->send_initial_metadata) {
950 log_metadata(
op->payload->send_initial_metadata.send_initial_metadata,
951 s->t->is_client,
true);
953 if (
op->send_trailing_metadata) {
954 log_metadata(
op->payload->send_trailing_metadata.send_trailing_metadata,
955 s->t->is_client,
false);
965 if (on_complete ==
nullptr) {
967 nullptr, grpc_schedule_on_exec_ctx);
970 if (
op->cancel_stream) {
980 s->t->is_client ?
"client" :
"server",
981 op->send_initial_metadata ?
" send_initial_metadata" :
"",
982 op->send_message ?
" send_message" :
"",
983 op->send_trailing_metadata ?
" send_trailing_metadata" :
"",
984 op->recv_initial_metadata ?
" recv_initial_metadata" :
"",
985 op->recv_message ?
" recv_message" :
"",
986 op->recv_trailing_metadata ?
" recv_trailing_metadata" :
"");
989 inproc_stream* other =
s->other_side;
991 (
op->send_initial_metadata ||
op->send_trailing_metadata)) {
992 if (
s->t->is_closed) {
997 ? &
s->write_buffer_initial_md
998 : &other->to_read_initial_md;
999 uint32_t* destflags = (other ==
nullptr)
1000 ? &
s->write_buffer_initial_md_flags
1001 : &other->to_read_initial_md_flags;
1002 bool* destfilled = (other ==
nullptr) ? &
s->write_buffer_initial_md_filled
1003 : &other->to_read_initial_md_filled;
1004 if (*destfilled ||
s->initial_md_sent) {
1009 if (!
s->other_side_closed) {
1010 (void)fill_in_metadata(
1011 s,
op->payload->send_initial_metadata.send_initial_metadata,
1012 op->payload->send_initial_metadata.send_initial_metadata_flags,
1013 dest, destflags, destfilled);
1015 if (
s->t->is_client) {
1017 (other ==
nullptr) ? &
s->write_buffer_deadline : &other->deadline;
1019 *dl,
op->payload->send_initial_metadata.send_initial_metadata
1022 s->initial_md_sent =
true;
1025 maybe_process_ops_locked(other,
error);
1030 (
op->send_message ||
op->send_trailing_metadata ||
1031 op->recv_initial_metadata ||
op->recv_message ||
1032 op->recv_trailing_metadata)) {
1034 if (
op->send_message) {
1035 s->send_message_op =
op;
1037 if (
op->send_trailing_metadata) {
1038 s->send_trailing_md_op =
op;
1040 if (
op->recv_initial_metadata) {
1041 s->recv_initial_md_op =
op;
1043 if (
op->recv_message) {
1044 s->recv_message_op =
op;
1046 if (
op->recv_trailing_metadata) {
1047 s->recv_trailing_md_op =
op;
1058 if ((
op->send_message && other && other->recv_message_op !=
nullptr) ||
1059 (
op->send_trailing_metadata &&
1060 (!
s->send_message_op || (other && other->recv_trailing_md_op))) ||
1061 (
op->recv_initial_metadata &&
s->to_read_initial_md_filled) ||
1062 (
op->recv_message && other && other->send_message_op !=
nullptr) ||
1063 (
s->to_read_trailing_md_filled ||
s->trailing_md_recvd)) {
1064 op_state_machine_locked(s,
error);
1066 s->ops_needed =
true;
1072 if (
op->send_message) {
1073 ResetSendMessage(
op);
1076 if (
op->recv_initial_metadata) {
1077 if (
op->payload->recv_initial_metadata.trailing_metadata_available !=
1083 *
op->payload->recv_initial_metadata.trailing_metadata_available =
1088 "perform_stream_op error %p scheduling initial-metadata-ready %s",
1092 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1095 if (
op->recv_message) {
1098 "perform_stream_op error %p scheduling recv message-ready %s", s,
1100 if (
op->payload->recv_message.call_failed_before_recv_message !=
1102 *
op->payload->recv_message.call_failed_before_recv_message =
true;
1105 op->payload->recv_message.recv_message_ready,
1108 if (
op->recv_trailing_metadata) {
1110 "perform_stream_op error %p scheduling "
1111 "trailing-metadata-ready %s",
1115 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1131 if (!
t->is_closed) {
1132 t->is_closed =
true;
1134 while (
t->stream_list !=
nullptr) {
1146 inproc_transport*
t =
reinterpret_cast<inproc_transport*
>(gt);
1149 if (
op->start_connectivity_watch !=
nullptr) {
1150 t->state_tracker.AddWatcher(
op->start_connectivity_watch_state,
1153 if (
op->stop_connectivity_watch !=
nullptr) {
1154 t->state_tracker.RemoveWatcher(
op->stop_connectivity_watch);
1156 if (
op->set_accept_stream) {
1157 t->accept_stream_cb =
op->set_accept_stream_fn;
1158 t->accept_stream_data =
op->set_accept_stream_user_data;
1160 if (
op->on_consumed) {
1183 inproc_transport*
t =
reinterpret_cast<inproc_transport*
>(gt);
1184 inproc_stream*
s =
reinterpret_cast<inproc_stream*
>(gs);
1186 close_stream_locked(s);
1188 s->~inproc_stream();
1194 inproc_transport*
t =
reinterpret_cast<inproc_transport*
>(gt);
1199 t->other_side->unref();
1220 sizeof(inproc_stream),
"inproc",
1236 inproc_transport* st =
new (
gpr_malloc(
sizeof(*st)))
1237 inproc_transport(&inproc_vtable,
mu,
false);
1238 inproc_transport* ct =
new (
gpr_malloc(
sizeof(*ct)))
1239 inproc_transport(&inproc_vtable,
mu,
true);
1240 st->other_side = ct;
1241 ct->other_side = st;
1250 GRPC_API_TRACE(
"grpc_inproc_channel_create(server=%p, args=%p)", 2,
1267 default_authority_arg.
value.
string =
const_cast<char*
>(
"inproc.authority");
1276 inproc_transports_create(&server_transport, server_args, &client_transport,
1281 server_transport,
nullptr, server_args,
nullptr);
1287 if (!new_channel.ok()) {
1301 nullptr,
status,
"Failed to create client channel");
1303 channel = new_channel->release()->c_ptr();
1318 nullptr,
status,
"Failed to create server channel");