inproc_transport.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
22 
23 #include <stdint.h>
24 
25 #include <algorithm>
26 #include <memory>
27 #include <new>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 #include "absl/utility/utility.h"
37 
38 #include <grpc/grpc.h>
40 #include <grpc/status.h>
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/log.h>
43 #include <grpc/support/sync.h>
44 
71 
72 #define INPROC_LOG(...) \
73  do { \
74  if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
75  gpr_log(__VA_ARGS__); \
76  } \
77  } while (0)
78 
79 namespace {
80 struct inproc_stream;
81 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
82 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error);
83 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
84 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
85  bool is_initial);
86 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
88  uint32_t* outflags, bool* markfilled);
89 
90 void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
91  absl::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
92 }
93 
94 struct shared_mu {
95  shared_mu() {
96  // Share one lock between both sides since both sides get affected
97  gpr_mu_init(&mu);
98  gpr_ref_init(&refs, 2);
99  }
100 
101  ~shared_mu() { gpr_mu_destroy(&mu); }
102 
103  gpr_mu mu;
105 };
106 
107 struct inproc_transport {
108  inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
109  bool is_client)
110  : mu(mu),
111  is_client(is_client),
112  state_tracker(is_client ? "inproc_client" : "inproc_server",
114  base.vtable = vtable;
115  // Start each side of transport with 2 refs since they each have a ref
116  // to the other
117  gpr_ref_init(&refs, 2);
118  }
119 
120  ~inproc_transport() {
121  if (gpr_unref(&mu->refs)) {
122  mu->~shared_mu();
123  gpr_free(mu);
124  }
125  }
126 
127  void ref() {
128  INPROC_LOG(GPR_INFO, "ref_transport %p", this);
129  gpr_ref(&refs);
130  }
131 
132  void unref() {
133  INPROC_LOG(GPR_INFO, "unref_transport %p", this);
134  if (!gpr_unref(&refs)) {
135  return;
136  }
137  INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
138  this->~inproc_transport();
139  gpr_free(this);
140  }
141 
143  shared_mu* mu;
145  bool is_client;
147  void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
148  const void* server_data);
149  void* accept_stream_data;
150  bool is_closed = false;
151  struct inproc_transport* other_side;
152  struct inproc_stream* stream_list = nullptr;
153 };
154 
155 struct inproc_stream {
156  inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
157  const void* server_data, grpc_core::Arena* arena)
158  : t(t), refs(refcount), arena(arena) {
159  // Ref this stream right now for ctor and list.
160  ref("inproc_init_stream:init");
161  ref("inproc_init_stream:list");
162 
163  stream_list_prev = nullptr;
164  gpr_mu_lock(&t->mu->mu);
165  stream_list_next = t->stream_list;
166  if (t->stream_list) {
167  t->stream_list->stream_list_prev = this;
168  }
169  t->stream_list = this;
170  gpr_mu_unlock(&t->mu->mu);
171 
172  if (!server_data) {
173  t->ref();
174  inproc_transport* st = t->other_side;
175  st->ref();
176  other_side = nullptr; // will get filled in soon
177  // Pass the client-side stream address to the server-side for a ref
178  ref("inproc_init_stream:clt"); // ref it now on behalf of server
179  // side to avoid destruction
180  INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
181  st->accept_stream_cb, st->accept_stream_data);
182  (*st->accept_stream_cb)(st->accept_stream_data, &st->base, this);
183  } else {
184  // This is the server-side and is being called through accept_stream_cb
185  inproc_stream* cs = const_cast<inproc_stream*>(
186  static_cast<const inproc_stream*>(server_data));
187  other_side = cs;
188  // Ref the server-side stream on behalf of the client now
189  ref("inproc_init_stream:srv");
190 
191  // Now we are about to affect the other side, so lock the transport
192  // to make sure that it doesn't get destroyed
193  gpr_mu_lock(&t->mu->mu);
194  cs->other_side = this;
195  // Now transfer from the other side's write_buffer if any to the to_read
196  // buffer
197  if (cs->write_buffer_initial_md_filled) {
198  (void)fill_in_metadata(this, &cs->write_buffer_initial_md,
199  cs->write_buffer_initial_md_flags,
200  &to_read_initial_md, &to_read_initial_md_flags,
201  &to_read_initial_md_filled);
202  deadline = std::min(deadline, cs->write_buffer_deadline);
203  cs->write_buffer_initial_md.Clear();
204  cs->write_buffer_initial_md_filled = false;
205  }
206  if (cs->write_buffer_trailing_md_filled) {
207  (void)fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
208  &to_read_trailing_md, nullptr,
209  &to_read_trailing_md_filled);
210  cs->write_buffer_trailing_md.Clear();
211  cs->write_buffer_trailing_md_filled = false;
212  }
213  if (!GRPC_ERROR_IS_NONE(cs->write_buffer_cancel_error)) {
214  cancel_other_error = cs->write_buffer_cancel_error;
215  cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
216  maybe_process_ops_locked(this, cancel_other_error);
217  }
218 
219  gpr_mu_unlock(&t->mu->mu);
220  }
221  }
222 
223  ~inproc_stream() {
224  GRPC_ERROR_UNREF(write_buffer_cancel_error);
225  GRPC_ERROR_UNREF(cancel_self_error);
226  GRPC_ERROR_UNREF(cancel_other_error);
227 
228  t->unref();
229  }
230 
231 #ifndef NDEBUG
232 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
233 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
234 #else
235 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
236 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
237 #endif
238  void ref(const char* reason) {
239  INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
240  STREAM_REF(refs, reason);
241  }
242 
243  void unref(const char* reason) {
244  INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
245  STREAM_UNREF(refs, reason);
246  }
247 #undef STREAM_REF
248 #undef STREAM_UNREF
249 
250  inproc_transport* t;
253 
254  grpc_metadata_batch to_read_initial_md{arena};
255  uint32_t to_read_initial_md_flags = 0;
256  bool to_read_initial_md_filled = false;
257  grpc_metadata_batch to_read_trailing_md{arena};
258  bool to_read_trailing_md_filled = false;
259  bool ops_needed = false;
260  // Write buffer used only during gap at init time when client-side
261  // stream is set up but server side stream is not yet set up
262  grpc_metadata_batch write_buffer_initial_md{arena};
263  bool write_buffer_initial_md_filled = false;
264  uint32_t write_buffer_initial_md_flags = 0;
265  grpc_core::Timestamp write_buffer_deadline =
267  grpc_metadata_batch write_buffer_trailing_md{arena};
268  bool write_buffer_trailing_md_filled = false;
269  grpc_error_handle write_buffer_cancel_error = GRPC_ERROR_NONE;
270 
271  struct inproc_stream* other_side;
272  bool other_side_closed = false; // won't talk anymore
273  bool write_buffer_other_side_closed = false; // on hold
274 
275  grpc_transport_stream_op_batch* send_message_op = nullptr;
276  grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
277  grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
278  grpc_transport_stream_op_batch* recv_message_op = nullptr;
279  grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
280 
281  bool initial_md_sent = false;
282  bool trailing_md_sent = false;
283  bool initial_md_recvd = false;
284  bool trailing_md_recvd = false;
285  // The following tracks if the server-side only pretends to have received
286  // trailing metadata since it no longer cares about the RPC. If that is the
287  // case, it is still ok for the client to send trailing metadata (in which
288  // case it will be ignored).
289  bool trailing_md_recvd_implicit_only = false;
290 
291  bool closed = false;
292 
293  grpc_error_handle cancel_self_error = GRPC_ERROR_NONE;
294  grpc_error_handle cancel_other_error = GRPC_ERROR_NONE;
295 
297 
298  bool listed = true;
299  struct inproc_stream* stream_list_prev;
300  struct inproc_stream* stream_list_next;
301 };
302 
303 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
304  bool is_initial) {
306  "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:");
308  gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
309  });
310 }
311 
312 namespace {
313 
314 class CopySink {
315  public:
316  explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {}
317 
318  void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
319  dst_->Append(key.as_string_view(), value.AsOwned(),
320  [](absl::string_view, const grpc_core::Slice&) {});
321  }
322 
323  template <class T, class V>
324  void Encode(T trait, V value) {
325  dst_->Set(trait, value);
326  }
327 
328  template <class T>
329  void Encode(T trait, const grpc_core::Slice& value) {
330  dst_->Set(trait, value.AsOwned());
331  }
332 
333  private:
334  grpc_metadata_batch* dst_;
335 };
336 
337 } // namespace
338 
339 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
341  uint32_t* outflags, bool* markfilled) {
343  log_metadata(metadata, s->t->is_client, outflags != nullptr);
344  }
345 
346  if (outflags != nullptr) {
347  *outflags = flags;
348  }
349  if (markfilled != nullptr) {
350  *markfilled = true;
351  }
352 
353  // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy
354  // function. Can only do this once mdelems are out of the way though, too
355  // many edge cases otherwise.
356  out_md->Clear();
357  CopySink sink(out_md);
358  metadata->Encode(&sink);
359 }
360 
362  grpc_stream_refcount* refcount, const void* server_data,
364  INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
365  inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
366  new (gs) inproc_stream(t, refcount, server_data, arena);
367  return 0; // return value is not important
368 }
369 
370 void close_stream_locked(inproc_stream* s) {
371  if (!s->closed) {
372  // Release the metadata that we would have written out
373  s->write_buffer_initial_md.Clear();
374  s->write_buffer_trailing_md.Clear();
375 
376  if (s->listed) {
377  inproc_stream* p = s->stream_list_prev;
378  inproc_stream* n = s->stream_list_next;
379  if (p != nullptr) {
380  p->stream_list_next = n;
381  } else {
382  s->t->stream_list = n;
383  }
384  if (n != nullptr) {
385  n->stream_list_prev = p;
386  }
387  s->listed = false;
388  s->unref("close_stream:list");
389  }
390  s->closed = true;
391  s->unref("close_stream:closing");
392  }
393 }
394 
395 // This function means that we are done talking/listening to the other side
396 void close_other_side_locked(inproc_stream* s, const char* reason) {
397  if (s->other_side != nullptr) {
398  // First release the metadata that came from the other side's arena
399  s->to_read_initial_md.Clear();
400  s->to_read_trailing_md.Clear();
401 
402  s->other_side->unref(reason);
403  s->other_side_closed = true;
404  s->other_side = nullptr;
405  } else if (!s->other_side_closed) {
406  s->write_buffer_other_side_closed = true;
407  }
408 }
409 
410 // Call the on_complete closure associated with this stream_op_batch if
411 // this stream_op_batch is only one of the pending operations for this
412 // stream. This is called when one of the pending operations for the stream
413 // is done and about to be NULLed out
414 void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
416  const char* msg) {
417  int is_sm = static_cast<int>(op == s->send_message_op);
418  int is_stm = static_cast<int>(op == s->send_trailing_md_op);
419  // TODO(vjpai): We should not consider the recv ops here, since they
420  // have their own callbacks. We should invoke a batch's on_complete
421  // as soon as all of the batch's send ops are complete, even if there
422  // are still recv ops pending.
423  int is_rim = static_cast<int>(op == s->recv_initial_md_op);
424  int is_rm = static_cast<int>(op == s->recv_message_op);
425  int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
426 
427  if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
428  INPROC_LOG(GPR_INFO, "%s %p %p %s", msg, s, op,
432  }
433 }
434 
435 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
436  if (s && (!GRPC_ERROR_IS_NONE(error) || s->ops_needed)) {
437  s->ops_needed = false;
438  op_state_machine_locked(s, error);
439  }
440 }
441 
442 void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
443  INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
444  // If we're failing this side, we need to make sure that
445  // we also send or have already sent trailing metadata
446  if (!s->trailing_md_sent) {
447  // Send trailing md to the other side indicating cancellation
448  s->trailing_md_sent = true;
449 
450  grpc_metadata_batch fake_md(s->arena);
451  inproc_stream* other = s->other_side;
452  grpc_metadata_batch* dest = (other == nullptr)
453  ? &s->write_buffer_trailing_md
454  : &other->to_read_trailing_md;
455  bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
456  : &other->to_read_trailing_md_filled;
457  (void)fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
458 
459  if (other != nullptr) {
460  if (GRPC_ERROR_IS_NONE(other->cancel_other_error)) {
461  other->cancel_other_error = GRPC_ERROR_REF(error);
462  }
463  maybe_process_ops_locked(other, error);
464  } else if (GRPC_ERROR_IS_NONE(s->write_buffer_cancel_error)) {
465  s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
466  }
467  }
468  if (s->recv_initial_md_op) {
470  if (!s->t->is_client) {
471  // If this is a server, provide initial metadata with a path and
472  // authority since it expects that as well as no error yet
473  grpc_metadata_batch fake_md(s->arena);
474  fake_md.Set(grpc_core::HttpPathMetadata(),
476  fake_md.Set(grpc_core::HttpAuthorityMetadata(),
477  grpc_core::Slice::FromStaticString("inproc-fail"));
478 
479  (void)fill_in_metadata(
480  s, &fake_md, 0,
481  s->recv_initial_md_op->payload->recv_initial_metadata
482  .recv_initial_metadata,
483  s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
484  nullptr);
486  } else {
488  }
489  if (s->recv_initial_md_op->payload->recv_initial_metadata
490  .trailing_metadata_available != nullptr) {
491  // Set to true unconditionally, because we're failing the call, so even
492  // if we haven't actually seen the send_trailing_metadata op from the
493  // other side, we're going to return trailing metadata anyway.
494  *s->recv_initial_md_op->payload->recv_initial_metadata
495  .trailing_metadata_available = true;
496  }
498  "fail_helper %p scheduling initial-metadata-ready %s %s", s,
503  s->recv_initial_md_op->payload->recv_initial_metadata
504  .recv_initial_metadata_ready,
505  err);
506  // Last use of err so no need to REF and then UNREF it
507 
508  complete_if_batch_end_locked(
509  s, error, s->recv_initial_md_op,
510  "fail_helper scheduling recv-initial-metadata-on-complete");
511  s->recv_initial_md_op = nullptr;
512  }
513  if (s->recv_message_op) {
514  INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
516  if (s->recv_message_op->payload->recv_message
517  .call_failed_before_recv_message != nullptr) {
518  *s->recv_message_op->payload->recv_message
519  .call_failed_before_recv_message = true;
520  }
523  s->recv_message_op->payload->recv_message.recv_message_ready,
525  complete_if_batch_end_locked(
526  s, error, s->recv_message_op,
527  "fail_helper scheduling recv-message-on-complete");
528  s->recv_message_op = nullptr;
529  }
530  if (s->send_message_op) {
531  ResetSendMessage(s->send_message_op);
532  complete_if_batch_end_locked(
533  s, error, s->send_message_op,
534  "fail_helper scheduling send-message-on-complete");
535  s->send_message_op = nullptr;
536  }
537  if (s->send_trailing_md_op) {
538  complete_if_batch_end_locked(
539  s, error, s->send_trailing_md_op,
540  "fail_helper scheduling send-trailng-md-on-complete");
541  s->send_trailing_md_op = nullptr;
542  }
543  if (s->recv_trailing_md_op) {
544  INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s",
548  s->recv_trailing_md_op->payload->recv_trailing_metadata
549  .recv_trailing_metadata_ready,
551  INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s",
553  complete_if_batch_end_locked(
554  s, error, s->recv_trailing_md_op,
555  "fail_helper scheduling recv-trailing-metadata-on-complete");
556  s->recv_trailing_md_op = nullptr;
557  }
558  close_other_side_locked(s, "fail_helper:other_side");
559  close_stream_locked(s);
560 
562 }
563 
564 // TODO(vjpai): It should not be necessary to drain the incoming byte
565 // stream and create a new one; instead, we should simply pass the byte
566 // stream from the sender directly to the receiver as-is.
567 //
568 // Note that fixing this will also avoid the assumption in this code
569 // that the incoming byte stream's next() call will always return
570 // synchronously. That assumption is true today but may not always be
571 // true in the future.
572 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
573  *receiver->recv_message_op->payload->recv_message.recv_message =
574  std::move(*sender->send_message_op->payload->send_message.send_message);
575  *receiver->recv_message_op->payload->recv_message.flags =
576  sender->send_message_op->payload->send_message.flags;
577 
578  INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
579  receiver);
582  receiver->recv_message_op->payload->recv_message.recv_message_ready,
584  complete_if_batch_end_locked(
585  sender, GRPC_ERROR_NONE, sender->send_message_op,
586  "message_transfer scheduling sender on_complete");
587  complete_if_batch_end_locked(
588  receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
589  "message_transfer scheduling receiver on_complete");
590 
591  receiver->recv_message_op = nullptr;
592  sender->send_message_op = nullptr;
593 }
594 
595 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
596  // This function gets called when we have contents in the unprocessed reads
597  // Get what we want based on our ops wanted
598  // Schedule our appropriate closures
599  // and then return to ops_needed state if still needed
600 
602 
603  bool needs_close = false;
604 
605  INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
606  // cancellation takes precedence
607  inproc_stream* other = s->other_side;
608 
609  if (!GRPC_ERROR_IS_NONE(s->cancel_self_error)) {
610  fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
611  goto done;
612  } else if (!GRPC_ERROR_IS_NONE(s->cancel_other_error)) {
613  fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
614  goto done;
615  } else if (!GRPC_ERROR_IS_NONE(error)) {
616  fail_helper_locked(s, GRPC_ERROR_REF(error));
617  goto done;
618  }
619 
620  if (s->send_message_op && other) {
621  if (other->recv_message_op) {
622  message_transfer_locked(s, other);
623  maybe_process_ops_locked(other, GRPC_ERROR_NONE);
624  } else if (!s->t->is_client && s->trailing_md_sent) {
625  // A server send will never be matched if the server already sent status
626  ResetSendMessage(s->send_message_op);
627  complete_if_batch_end_locked(
628  s, GRPC_ERROR_NONE, s->send_message_op,
629  "op_state_machine scheduling send-message-on-complete case 1");
630  s->send_message_op = nullptr;
631  }
632  }
633  // Pause a send trailing metadata if there is still an outstanding
634  // send message unless we know that the send message will never get
635  // matched to a receive. This happens on the client if the server has
636  // already sent status or on the server if the client has requested
637  // status
638  if (s->send_trailing_md_op &&
639  (!s->send_message_op ||
640  (s->t->is_client &&
641  (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
642  (!s->t->is_client && other &&
643  (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
644  other->recv_trailing_md_op)))) {
645  grpc_metadata_batch* dest = (other == nullptr)
646  ? &s->write_buffer_trailing_md
647  : &other->to_read_trailing_md;
648  bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
649  : &other->to_read_trailing_md_filled;
650  if (*destfilled || s->trailing_md_sent) {
651  // The buffer is already in use; that's an error!
652  INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
653  new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
654  fail_helper_locked(s, GRPC_ERROR_REF(new_err));
655  goto done;
656  } else {
657  if (!other || !other->closed) {
658  (void)fill_in_metadata(
659  s,
660  s->send_trailing_md_op->payload->send_trailing_metadata
661  .send_trailing_metadata,
662  0, dest, nullptr, destfilled);
663  }
664  s->trailing_md_sent = true;
665  if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
666  *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
667  }
668  if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
670  "op_state_machine %p scheduling trailing-metadata-ready", s);
673  s->recv_trailing_md_op->payload->recv_trailing_metadata
674  .recv_trailing_metadata_ready,
677  "op_state_machine %p scheduling trailing-md-on-complete", s);
679  s->recv_trailing_md_op->on_complete,
681  s->recv_trailing_md_op = nullptr;
682  needs_close = true;
683  }
684  }
685  maybe_process_ops_locked(other, GRPC_ERROR_NONE);
686  complete_if_batch_end_locked(
687  s, GRPC_ERROR_NONE, s->send_trailing_md_op,
688  "op_state_machine scheduling send-trailing-metadata-on-complete");
689  s->send_trailing_md_op = nullptr;
690  }
691  if (s->recv_initial_md_op) {
692  if (s->initial_md_recvd) {
693  new_err =
694  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
695  INPROC_LOG(
696  GPR_INFO,
697  "op_state_machine %p scheduling on_complete errors for already "
698  "recvd initial md %s",
699  s, grpc_error_std_string(new_err).c_str());
700  fail_helper_locked(s, GRPC_ERROR_REF(new_err));
701  goto done;
702  }
703 
704  if (s->to_read_initial_md_filled) {
705  s->initial_md_recvd = true;
706  fill_in_metadata(
707  s, &s->to_read_initial_md, s->to_read_initial_md_flags,
708  s->recv_initial_md_op->payload->recv_initial_metadata
709  .recv_initial_metadata,
710  s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
711  nullptr);
712  if (s->deadline != grpc_core::Timestamp::InfFuture()) {
713  s->recv_initial_md_op->payload->recv_initial_metadata
714  .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
715  s->deadline);
716  }
717  if (s->recv_initial_md_op->payload->recv_initial_metadata
718  .trailing_metadata_available != nullptr) {
719  *s->recv_initial_md_op->payload->recv_initial_metadata
720  .trailing_metadata_available =
721  (other != nullptr && other->send_trailing_md_op != nullptr);
722  }
723  s->to_read_initial_md.Clear();
724  s->to_read_initial_md_filled = false;
727  s->recv_initial_md_op->payload->recv_initial_metadata
728  .recv_initial_metadata_ready,
730  complete_if_batch_end_locked(
731  s, GRPC_ERROR_NONE, s->recv_initial_md_op,
732  "op_state_machine scheduling recv-initial-metadata-on-complete");
733  s->recv_initial_md_op = nullptr;
734  }
735  }
736  if (s->recv_message_op) {
737  if (other && other->send_message_op) {
738  message_transfer_locked(other, s);
739  maybe_process_ops_locked(other, GRPC_ERROR_NONE);
740  }
741  }
742  if (s->to_read_trailing_md_filled) {
743  if (s->trailing_md_recvd) {
744  if (s->trailing_md_recvd_implicit_only) {
746  "op_state_machine %p already implicitly received trailing "
747  "metadata, so ignoring new trailing metadata from client",
748  s);
749  s->to_read_trailing_md.Clear();
750  s->to_read_trailing_md_filled = false;
751  s->trailing_md_recvd_implicit_only = false;
752  } else {
753  new_err =
754  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
755  INPROC_LOG(
756  GPR_INFO,
757  "op_state_machine %p scheduling on_complete errors for already "
758  "recvd trailing md %s",
759  s, grpc_error_std_string(new_err).c_str());
760  fail_helper_locked(s, GRPC_ERROR_REF(new_err));
761  goto done;
762  }
763  }
764  if (s->recv_message_op != nullptr) {
765  // This message needs to be wrapped up because it will never be
766  // satisfied
767  s->recv_message_op->payload->recv_message.recv_message->reset();
768  INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
771  s->recv_message_op->payload->recv_message.recv_message_ready,
773  complete_if_batch_end_locked(
774  s, new_err, s->recv_message_op,
775  "op_state_machine scheduling recv-message-on-complete");
776  s->recv_message_op = nullptr;
777  }
778  if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
779  // Nothing further will try to receive from this stream, so finish off
780  // any outstanding send_message op
781  ResetSendMessage(s->send_message_op);
782  s->send_message_op->payload->send_message.stream_write_closed = true;
783  complete_if_batch_end_locked(
784  s, new_err, s->send_message_op,
785  "op_state_machine scheduling send-message-on-complete case 2");
786  s->send_message_op = nullptr;
787  }
788  if (s->recv_trailing_md_op != nullptr) {
789  // We wanted trailing metadata and we got it
790  s->trailing_md_recvd = true;
791  fill_in_metadata(s, &s->to_read_trailing_md, 0,
792  s->recv_trailing_md_op->payload->recv_trailing_metadata
793  .recv_trailing_metadata,
794  nullptr, nullptr);
795  s->to_read_trailing_md.Clear();
796  s->to_read_trailing_md_filled = false;
797 
798  // We should schedule the recv_trailing_md_op completion if
799  // 1. this stream is the client-side
800  // 2. this stream is the server-side AND has already sent its trailing md
801  // (If the server hasn't already sent its trailing md, it doesn't
802  // have
803  // a final status, so don't mark this op complete)
804  if (s->t->is_client || s->trailing_md_sent) {
807  s->recv_trailing_md_op->payload->recv_trailing_metadata
808  .recv_trailing_metadata_ready,
811  s->recv_trailing_md_op->on_complete,
813  s->recv_trailing_md_op = nullptr;
814  needs_close = s->trailing_md_sent;
815  }
816  } else if (!s->trailing_md_recvd) {
817  INPROC_LOG(
818  GPR_INFO,
819  "op_state_machine %p has trailing md but not yet waiting for it", s);
820  }
821  }
822  if (!s->t->is_client && s->trailing_md_sent &&
823  (s->recv_trailing_md_op != nullptr)) {
824  // In this case, we don't care to receive the write-close from the client
825  // because we have already sent status and the RPC is over as far as we
826  // are concerned.
827  INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s",
828  s, grpc_error_std_string(new_err).c_str());
831  s->recv_trailing_md_op->payload->recv_trailing_metadata
832  .recv_trailing_metadata_ready,
833  GRPC_ERROR_REF(new_err));
834  complete_if_batch_end_locked(
835  s, new_err, s->recv_trailing_md_op,
836  "op_state_machine scheduling recv-trailing-md-on-complete");
837  s->trailing_md_recvd = true;
838  s->recv_trailing_md_op = nullptr;
839  // Since we are only pretending to have received the trailing MD, it would
840  // be ok (not an error) if the client actually sends it later.
841  s->trailing_md_recvd_implicit_only = true;
842  }
843  if (s->trailing_md_recvd && s->recv_message_op) {
844  // No further message will come on this stream, so finish off the
845  // recv_message_op
846  INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
847  s->recv_message_op->payload->recv_message.recv_message->reset();
850  s->recv_message_op->payload->recv_message.recv_message_ready,
852  complete_if_batch_end_locked(
853  s, new_err, s->recv_message_op,
854  "op_state_machine scheduling recv-message-on-complete");
855  s->recv_message_op = nullptr;
856  }
857  if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
858  // Nothing further will try to receive from this stream, so finish off
859  // any outstanding send_message op
860  ResetSendMessage(s->send_message_op);
861  complete_if_batch_end_locked(
862  s, new_err, s->send_message_op,
863  "op_state_machine scheduling send-message-on-complete case 3");
864  s->send_message_op = nullptr;
865  }
866  if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
867  s->recv_message_op || s->recv_trailing_md_op) {
868  // Didn't get the item we wanted so we still need to get
869  // rescheduled
870  INPROC_LOG(
871  GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
872  s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
873  s->recv_message_op, s->recv_trailing_md_op);
874  s->ops_needed = true;
875  }
876 done:
877  if (needs_close) {
878  close_other_side_locked(s, "op_state_machine");
879  close_stream_locked(s);
880  }
881  GRPC_ERROR_UNREF(new_err);
882 }
883 
884 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
885  bool ret = false; // was the cancel accepted
886  INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s,
888  if (GRPC_ERROR_IS_NONE(s->cancel_self_error)) {
889  ret = true;
890  s->cancel_self_error = GRPC_ERROR_REF(error);
891  // Catch current value of other before it gets closed off
892  inproc_stream* other = s->other_side;
893  maybe_process_ops_locked(s, s->cancel_self_error);
894  // Send trailing md to the other side indicating cancellation, even if we
895  // already have
896  s->trailing_md_sent = true;
897 
898  grpc_metadata_batch cancel_md(s->arena);
899 
900  grpc_metadata_batch* dest = (other == nullptr)
901  ? &s->write_buffer_trailing_md
902  : &other->to_read_trailing_md;
903  bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
904  : &other->to_read_trailing_md_filled;
905  (void)fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
906 
907  if (other != nullptr) {
908  if (GRPC_ERROR_IS_NONE(other->cancel_other_error)) {
909  other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
910  }
911  maybe_process_ops_locked(other, other->cancel_other_error);
912  } else if (GRPC_ERROR_IS_NONE(s->write_buffer_cancel_error)) {
913  s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
914  }
915 
916  // if we are a server and already received trailing md but
917  // couldn't complete that because we hadn't yet sent out trailing
918  // md, now's the chance
919  if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
922  s->recv_trailing_md_op->payload->recv_trailing_metadata
923  .recv_trailing_metadata_ready,
924  GRPC_ERROR_REF(s->cancel_self_error));
925  complete_if_batch_end_locked(
926  s, s->cancel_self_error, s->recv_trailing_md_op,
927  "cancel_stream scheduling trailing-md-on-complete");
928  s->recv_trailing_md_op = nullptr;
929  }
930  }
931 
932  close_other_side_locked(s, "cancel_stream:other_side");
933  close_stream_locked(s);
934 
936  return ret;
937 }
938 
939 void do_nothing(void* /*arg*/, grpc_error_handle /*error*/) {}
940 
943  INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
944  inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
945  gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
946  gpr_mu_lock(mu);
947 
949  if (op->send_initial_metadata) {
950  log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
951  s->t->is_client, true);
952  }
953  if (op->send_trailing_metadata) {
954  log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
955  s->t->is_client, false);
956  }
957  }
959  grpc_closure* on_complete = op->on_complete;
960  // TODO(roth): This is a hack needed because we use data inside of the
961  // closure itself to do the barrier calculation (i.e., to ensure that
962  // we don't schedule the closure until all ops in the batch have been
963  // completed). This can go away once we move to a new C++ closure API
964  // that provides the ability to create a barrier closure.
965  if (on_complete == nullptr) {
966  on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
967  nullptr, grpc_schedule_on_exec_ctx);
968  }
969 
970  if (op->cancel_stream) {
971  // Call cancel_stream_locked without ref'ing the cancel_error because
972  // this function is responsible to make sure that that field gets unref'ed
973  cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
974  // this op can complete without an error
975  } else if (!GRPC_ERROR_IS_NONE(s->cancel_self_error)) {
976  // already self-canceled so still give it an error
977  error = GRPC_ERROR_REF(s->cancel_self_error);
978  } else {
979  INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
980  s->t->is_client ? "client" : "server",
981  op->send_initial_metadata ? " send_initial_metadata" : "",
982  op->send_message ? " send_message" : "",
983  op->send_trailing_metadata ? " send_trailing_metadata" : "",
984  op->recv_initial_metadata ? " recv_initial_metadata" : "",
985  op->recv_message ? " recv_message" : "",
986  op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
987  }
988 
989  inproc_stream* other = s->other_side;
990  if (GRPC_ERROR_IS_NONE(error) &&
991  (op->send_initial_metadata || op->send_trailing_metadata)) {
992  if (s->t->is_closed) {
993  error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
994  }
995  if (GRPC_ERROR_IS_NONE(error) && op->send_initial_metadata) {
996  grpc_metadata_batch* dest = (other == nullptr)
997  ? &s->write_buffer_initial_md
998  : &other->to_read_initial_md;
999  uint32_t* destflags = (other == nullptr)
1000  ? &s->write_buffer_initial_md_flags
1001  : &other->to_read_initial_md_flags;
1002  bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
1003  : &other->to_read_initial_md_filled;
1004  if (*destfilled || s->initial_md_sent) {
1005  // The buffer is already in use; that's an error!
1006  INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
1007  error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
1008  } else {
1009  if (!s->other_side_closed) {
1010  (void)fill_in_metadata(
1011  s, op->payload->send_initial_metadata.send_initial_metadata,
1012  op->payload->send_initial_metadata.send_initial_metadata_flags,
1013  dest, destflags, destfilled);
1014  }
1015  if (s->t->is_client) {
1016  grpc_core::Timestamp* dl =
1017  (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
1018  *dl = std::min(
1019  *dl, op->payload->send_initial_metadata.send_initial_metadata
1021  .value_or(grpc_core::Timestamp::InfFuture()));
1022  s->initial_md_sent = true;
1023  }
1024  }
1025  maybe_process_ops_locked(other, error);
1026  }
1027  }
1028 
1029  if (GRPC_ERROR_IS_NONE(error) &&
1030  (op->send_message || op->send_trailing_metadata ||
1031  op->recv_initial_metadata || op->recv_message ||
1032  op->recv_trailing_metadata)) {
1033  // Mark ops that need to be processed by the state machine
1034  if (op->send_message) {
1035  s->send_message_op = op;
1036  }
1037  if (op->send_trailing_metadata) {
1038  s->send_trailing_md_op = op;
1039  }
1040  if (op->recv_initial_metadata) {
1041  s->recv_initial_md_op = op;
1042  }
1043  if (op->recv_message) {
1044  s->recv_message_op = op;
1045  }
1046  if (op->recv_trailing_metadata) {
1047  s->recv_trailing_md_op = op;
1048  }
1049 
1050  // We want to initiate the state machine if:
1051  // 1. We want to send a message and the other side wants to receive
1052  // 2. We want to send trailing metadata and there isn't an unmatched send
1053  // or the other side wants trailing metadata
1054  // 3. We want initial metadata and the other side has sent it
1055  // 4. We want to receive a message and there is a message ready
1056  // 5. There is trailing metadata, even if nothing specifically wants
1057  // that because that can shut down the receive message as well
1058  if ((op->send_message && other && other->recv_message_op != nullptr) ||
1059  (op->send_trailing_metadata &&
1060  (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1061  (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1062  (op->recv_message && other && other->send_message_op != nullptr) ||
1063  (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1064  op_state_machine_locked(s, error);
1065  } else {
1066  s->ops_needed = true;
1067  }
1068  } else {
1069  if (!GRPC_ERROR_IS_NONE(error)) {
1070  // Consume any send message that was sent here but that we are not
1071  // pushing to the other side
1072  if (op->send_message) {
1073  ResetSendMessage(op);
1074  }
1075  // Schedule op's closures that we didn't push to op state machine
1076  if (op->recv_initial_metadata) {
1077  if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1078  nullptr) {
1079  // Set to true unconditionally, because we're failing the call, so
1080  // even if we haven't actually seen the send_trailing_metadata op
1081  // from the other side, we're going to return trailing metadata
1082  // anyway.
1083  *op->payload->recv_initial_metadata.trailing_metadata_available =
1084  true;
1085  }
1086  INPROC_LOG(
1087  GPR_INFO,
1088  "perform_stream_op error %p scheduling initial-metadata-ready %s",
1092  op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1094  }
1095  if (op->recv_message) {
1096  INPROC_LOG(
1097  GPR_INFO,
1098  "perform_stream_op error %p scheduling recv message-ready %s", s,
1100  if (op->payload->recv_message.call_failed_before_recv_message !=
1101  nullptr) {
1102  *op->payload->recv_message.call_failed_before_recv_message = true;
1103  }
1105  op->payload->recv_message.recv_message_ready,
1107  }
1108  if (op->recv_trailing_metadata) {
1110  "perform_stream_op error %p scheduling "
1111  "trailing-metadata-ready %s",
1115  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1117  }
1118  }
1119  INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s,
1122  }
1123  gpr_mu_unlock(mu);
1125 }
1126 
1127 void close_transport_locked(inproc_transport* t) {
1128  INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1129  t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1130  "close transport");
1131  if (!t->is_closed) {
1132  t->is_closed = true;
1133  /* Also end all streams on this transport */
1134  while (t->stream_list != nullptr) {
1135  // cancel_stream_locked also adjusts stream list
1137  t->stream_list,
1139  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1141  }
1142  }
1143 }
1144 
1146  inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1147  INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1148  gpr_mu_lock(&t->mu->mu);
1149  if (op->start_connectivity_watch != nullptr) {
1150  t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1151  std::move(op->start_connectivity_watch));
1152  }
1153  if (op->stop_connectivity_watch != nullptr) {
1154  t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1155  }
1156  if (op->set_accept_stream) {
1157  t->accept_stream_cb = op->set_accept_stream_fn;
1158  t->accept_stream_data = op->set_accept_stream_user_data;
1159  }
1160  if (op->on_consumed) {
1162  }
1163 
1164  bool do_close = false;
1165  if (!GRPC_ERROR_IS_NONE(op->goaway_error)) {
1166  do_close = true;
1167  GRPC_ERROR_UNREF(op->goaway_error);
1168  }
1169  if (!GRPC_ERROR_IS_NONE(op->disconnect_with_error)) {
1170  do_close = true;
1171  GRPC_ERROR_UNREF(op->disconnect_with_error);
1172  }
1173 
1174  if (do_close) {
1176  }
1177  gpr_mu_unlock(&t->mu->mu);
1178 }
1179 
1181  grpc_closure* then_schedule_closure) {
1182  INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1183  inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1184  inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1185  gpr_mu_lock(&t->mu->mu);
1186  close_stream_locked(s);
1187  gpr_mu_unlock(&t->mu->mu);
1188  s->~inproc_stream();
1189  grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1190  GRPC_ERROR_NONE);
1191 }
1192 
1194  inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1195  INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1196  gpr_mu_lock(&t->mu->mu);
1198  gpr_mu_unlock(&t->mu->mu);
1199  t->other_side->unref();
1200  t->unref();
1201 }
1202 
1203 /*******************************************************************************
1204  * INTEGRATION GLUE
1205  */
1206 
1207 void set_pollset(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1208  grpc_pollset* /*pollset*/) {
1209  // Nothing to do here
1210 }
1211 
1212 void set_pollset_set(grpc_transport* /*gt*/, grpc_stream* /*gs*/,
1213  grpc_pollset_set* /*pollset_set*/) {
1214  // Nothing to do here
1215 }
1216 
1217 grpc_endpoint* get_endpoint(grpc_transport* /*t*/) { return nullptr; }
1218 
1219 const grpc_transport_vtable inproc_vtable = {
1220  sizeof(inproc_stream), "inproc",
1221  init_stream, nullptr,
1225  get_endpoint};
1226 
1227 /*******************************************************************************
1228  * Main inproc transport functions
1229  */
1230 void inproc_transports_create(grpc_transport** server_transport,
1231  const grpc_channel_args* /*server_args*/,
1232  grpc_transport** client_transport,
1233  const grpc_channel_args* /*client_args*/) {
1234  INPROC_LOG(GPR_INFO, "inproc_transports_create");
1235  shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1236  inproc_transport* st = new (gpr_malloc(sizeof(*st)))
1237  inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
1238  inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
1239  inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
1240  st->other_side = ct;
1241  ct->other_side = st;
1242  *server_transport = reinterpret_cast<grpc_transport*>(st);
1243  *client_transport = reinterpret_cast<grpc_transport*>(ct);
1244 }
1245 } // namespace
1246 
1248  const grpc_channel_args* args,
1249  void* /*reserved*/) {
1250  GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1251  (server, args));
1252 
1254 
1256  // Remove max_connection_idle and max_connection_age channel arguments since
1257  // those do not apply to inproc transports.
1258  const char* args_to_remove[] = {GRPC_ARG_MAX_CONNECTION_IDLE_MS,
1261  core_server->channel_args(), args_to_remove,
1262  GPR_ARRAY_SIZE(args_to_remove));
1263  // Add a default authority channel argument for the client
1264  grpc_arg default_authority_arg;
1265  default_authority_arg.type = GRPC_ARG_STRING;
1266  default_authority_arg.key = const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY);
1267  default_authority_arg.value.string = const_cast<char*>("inproc.authority");
1268  args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1272  .ToC();
1274  grpc_transport* server_transport;
1275  grpc_transport* client_transport;
1276  inproc_transports_create(&server_transport, server_args, &client_transport,
1277  client_args);
1278 
1279  // TODO(ncteisen): design and support channelz GetSocket for inproc.
1280  grpc_error_handle error = core_server->SetupTransport(
1281  server_transport, nullptr, server_args, nullptr);
1282  grpc_channel* channel = nullptr;
1283  if (GRPC_ERROR_IS_NONE(error)) {
1284  auto new_channel = grpc_core::Channel::Create(
1285  "inproc", grpc_core::ChannelArgs::FromC(client_args),
1286  GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1287  if (!new_channel.ok()) {
1288  GPR_ASSERT(!channel);
1289  gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1291  intptr_t integer;
1294  status = static_cast<grpc_status_code>(integer);
1295  }
1297  // client_transport was destroyed when grpc_channel_create_internal saw an
1298  // error.
1299  grpc_transport_destroy(server_transport);
1301  nullptr, status, "Failed to create client channel");
1302  } else {
1303  channel = new_channel->release()->c_ptr();
1304  }
1305  } else {
1306  GPR_ASSERT(!channel);
1307  gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1309  intptr_t integer;
1312  status = static_cast<grpc_status_code>(integer);
1313  }
1315  grpc_transport_destroy(client_transport);
1316  grpc_transport_destroy(server_transport);
1318  nullptr, status, "Failed to create server channel");
1319  }
1320 
1321  // Free up created channel args
1322  grpc_channel_args_destroy(server_args);
1323  grpc_channel_args_destroy(client_args);
1324 
1325  // Now finish scheduled operations
1326 
1327  return channel;
1328 }
GRPC_CLOSURE_INIT
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
Definition: closure.h:115
grpc_arg
Definition: grpc_types.h:103
grpc_core::Server::SetupTransport
grpc_error_handle SetupTransport(grpc_transport *transport, grpc_pollset *accepting_pollset, const grpc_channel_args *args, const RefCountedPtr< channelz::SocketNode > &socket_node)
Definition: src/core/lib/surface/server.cc:605
slice.h
grpc_core::MetadataMap::Log
void Log(metadata_detail::LogFn log_fn) const
Definition: metadata_batch.h:1020
GPR_INFO
#define GPR_INFO
Definition: include/grpc/impl/codegen/log.h:56
grpc_core::GrpcTimeoutMetadata
Definition: metadata_batch.h:59
GRPC_CHANNEL_READY
@ GRPC_CHANNEL_READY
Definition: include/grpc/impl/codegen/connectivity_state.h:36
dst
static const char dst[]
Definition: test-fs-copyfile.c:37
gpr_mu_unlock
GPRAPI void gpr_mu_unlock(gpr_mu *mu)
pollset.h
vtable
static const grpc_transport_vtable vtable
Definition: binder_transport.cc:680
GRPC_ERROR_NONE
#define GRPC_ERROR_NONE
Definition: error.h:234
log.h
core_configuration.h
metadata_batch.h
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
test_evm.cs
cs
Definition: test_evm.py:8
absl::StrCat
std::string StrCat(const AlphaNum &a, const AlphaNum &b)
Definition: abseil-cpp/absl/strings/str_cat.cc:98
metadata
Definition: cq_verifier.cc:48
grpc_arg::value
union grpc_arg::grpc_arg_value value
connectivity_state.h
grpc_transport_vtable
Definition: transport_impl.h:37
GRPC_ARG_STRING
@ GRPC_ARG_STRING
Definition: grpc_types.h:80
grpc_channel_args_copy_and_remove
grpc_channel_args * grpc_channel_args_copy_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove)
Definition: channel_args.cc:231
STREAM_REF
#define STREAM_REF(refs, reason)
Definition: inproc_transport.cc:232
grpc_core::Server::channel_args
const grpc_channel_args * channel_args() const
Definition: src/core/lib/surface/server.h:132
grpc_core::Slice
Definition: src/core/lib/slice/slice.h:282
GRPC_CLIENT_DIRECT_CHANNEL
@ GRPC_CLIENT_DIRECT_CHANNEL
Definition: channel_stack_type.h:34
grpc_core::HttpPathMetadata
Definition: metadata_batch.h:262
grpc_pollset_set
struct grpc_pollset_set grpc_pollset_set
Definition: iomgr_fwd.h:23
grpc_core::slice_detail::StaticConstructors< Slice >::FromStaticString
static Slice FromStaticString(const char *s)
Definition: src/core/lib/slice/slice.h:201
GRPC_ARG_MAX_CONNECTION_AGE_MS
#define GRPC_ARG_MAX_CONNECTION_AGE_MS
Definition: grpc_types.h:166
grpc_lame_client_channel_create
GRPCAPI grpc_channel * grpc_lame_client_channel_create(const char *target, grpc_status_code error_code, const char *error_message)
Definition: lame_client.cc:131
absl::string_view
Definition: abseil-cpp/absl/strings/string_view.h:167
grpc_core::Timestamp
Definition: src/core/lib/gprpp/time.h:62
gpr_free
GPRAPI void gpr_free(void *ptr)
Definition: alloc.cc:51
useful.h
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
error
grpc_error_handle error
Definition: retry_filter.cc:499
grpc_status_code
grpc_status_code
Definition: include/grpc/impl/codegen/status.h:28
error_ref_leak.err
err
Definition: error_ref_leak.py:35
grpc_core::ChannelArgs::FromC
static ChannelArgs FromC(const grpc_channel_args *args)
Definition: channel_args.cc:84
arena.h
gpr_malloc
GPRAPI void * gpr_malloc(size_t size)
Definition: alloc.cc:29
closure.h
INPROC_LOG
#define INPROC_LOG(...)
Definition: inproc_transport.cc:72
status
absl::Status status
Definition: rls.cc:251
absl::FormatConversionChar::s
@ s
xds_manager.p
p
Definition: xds_manager.py:60
grpc_inproc_trace
grpc_core::TraceFlag grpc_inproc_trace(false, "inproc")
channelz.h
do_close
static void do_close(void *handle)
Definition: test-ref.c:47
inproc_transport.h
grpc_core::Arena
Definition: src/core/lib/resource_quota/arena.h:45
grpc_channel_args
Definition: grpc_types.h:132
GRPC_TRACE_FLAG_ENABLED
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: debug/trace.h:114
T
#define T(upbtypeconst, upbtype, ctype, default_value)
gpr_refcount
Definition: impl/codegen/sync_generic.h:39
grpc_transport_op
Definition: transport.h:452
arena
grpc_core::ScopedArenaPtr arena
Definition: binder_transport_test.cc:237
channel_args_preconditioning.h
status.h
uint32_t
unsigned int uint32_t
Definition: stdint-msvc2008.h:80
get_endpoint
static grpc_endpoint * get_endpoint(grpc_transport *)
Definition: binder_transport.cc:674
gpr_mu_destroy
GPRAPI void gpr_mu_destroy(gpr_mu *mu)
DEBUG_LOCATION
#define DEBUG_LOCATION
Definition: debug_location.h:41
set_pollset_set
static void set_pollset_set(grpc_transport *, grpc_stream *, grpc_pollset_set *)
Definition: binder_transport.cc:132
refcount
size_t refcount
Definition: abseil-cpp/absl/strings/internal/cordz_info.cc:122
GRPC_ARG_DEFAULT_AUTHORITY
#define GRPC_ARG_DEFAULT_AUTHORITY
Definition: grpc_types.h:251
refs
std::vector< CordRep * > refs
Definition: cordz_info_statistics_test.cc:81
channel
wrapped_grpc_channel * channel
Definition: src/php/ext/grpc/call.h:33
asyncio_get_stats.args
args
Definition: asyncio_get_stats.py:40
grpc_arg::grpc_arg_value::string
char * string
Definition: grpc_types.h:107
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
perform_stream_op
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op_batch *op)
Definition: binder_transport.cc:564
grpc_core::CoreConfiguration::Get
static const CoreConfiguration & Get()
Definition: core_configuration.h:82
gen_stats_data.c_str
def c_str(s, encoding='ascii')
Definition: gen_stats_data.py:38
mu
Mutex mu
Definition: server_config_selector_filter.cc:74
grpc_core::HttpAuthorityMetadata
Definition: metadata_batch.h:256
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
gpr_mu_init
GPRAPI void gpr_mu_init(gpr_mu *mu)
transport
grpc_transport transport
Definition: filter_fuzzer.cc:146
grpc_transport_destroy
void grpc_transport_destroy(grpc_transport *transport)
Definition: transport.cc:96
destroy_transport
static void destroy_transport(grpc_transport *gt)
Definition: binder_transport.cc:666
grpc.h
connectivity_state.h
log_metadata
static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, bool is_client, bool is_initial)
Definition: chttp2_transport.cc:1318
done
struct tab * done
Definition: bloaty/third_party/zlib/examples/enough.c:176
grpc_core::ChannelArgs::ToC
const grpc_channel_args * ToC() const
Definition: channel_args.cc:94
grpc_channel_args_destroy
void grpc_channel_args_destroy(grpc_channel_args *a)
Definition: channel_args.cc:360
ref
unsigned ref
Definition: cxa_demangle.cpp:4909
grpc_error_get_int
bool grpc_error_get_int(grpc_error_handle err, grpc_error_ints which, intptr_t *p)
Definition: error.cc:635
grpc_transport_stream_op_batch::payload
grpc_transport_stream_op_batch_payload * payload
Definition: transport.h:307
time.h
grpc_server
struct grpc_server grpc_server
Definition: grpc_types.h:65
intptr_t
_W64 signed int intptr_t
Definition: stdint-msvc2008.h:118
gpr_mu_lock
GPRAPI void gpr_mu_lock(gpr_mu *mu)
error.h
grpc_core::MetadataMap::Clear
void Clear()
Definition: metadata_batch.h:1214
gen_synthetic_protos.base
base
Definition: gen_synthetic_protos.py:31
batch
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:243
GPR_ERROR
#define GPR_ERROR
Definition: include/grpc/impl/codegen/log.h:57
min
#define min(a, b)
Definition: qsort.h:83
channel_stack_type.h
grpc_core::CoreConfiguration::channel_args_preconditioning
const ChannelArgsPreconditioning & channel_args_preconditioning() const
Definition: core_configuration.h:139
grpc_core::ExecCtx
Definition: exec_ctx.h:97
STREAM_UNREF
#define STREAM_UNREF(refs, reason)
Definition: inproc_transport.cc:233
n
int n
Definition: abseil-cpp/absl/container/btree_test.cc:1080
tests.qps.qps_worker.dest
dest
Definition: qps_worker.py:45
stdint.h
transport_fwd.h
msg
std::string msg
Definition: client_interceptors_end2end_test.cc:372
GRPC_ERROR_CREATE_FROM_STATIC_STRING
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
Definition: error.h:291
init_stream
static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: binder_transport.cc:103
value
const char * value
Definition: hpack_parser_table.cc:165
perform_transport_op
static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op)
Definition: binder_transport.cc:625
GPR_ARRAY_SIZE
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:129
grpc_core::Server
Definition: src/core/lib/surface/server.h:75
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
debug_location.h
key
const char * key
Definition: hpack_parser_table.cc:164
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
absl::flags_internal
Definition: abseil-cpp/absl/flags/commandlineflag.h:40
server
Definition: examples/python/async_streaming/server.py:1
gpr_mu
pthread_mutex_t gpr_mu
Definition: impl/codegen/sync_posix.h:47
sink
FormatSinkImpl * sink
Definition: abseil-cpp/absl/strings/internal/str_format/float_conversion.cc:450
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
grpc_error_std_string
std::string grpc_error_std_string(grpc_error_handle error)
Definition: error.cc:944
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
absl::str_format_internal::LengthMod::t
@ t
ret
UniquePtr< SSL_SESSION > ret
Definition: ssl_x509.cc:1029
alloc.h
grpc_arg::key
char * key
Definition: grpc_types.h:105
grpc_transport_stream_op_batch_payload::send_message
grpc_core::SliceBuffer * send_message
Definition: transport.h:373
prefix
static const char prefix[]
Definition: head_of_line_blocking.cc:28
exec_ctx.h
server.h
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
grpc_core::ExecCtx::Run
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
Definition: exec_ctx.cc:98
ref_counted_ptr.h
grpc_channel
struct grpc_channel grpc_channel
Definition: grpc_types.h:62
grpc_transport
Definition: transport_impl.h:89
transport.h
grpc_stream
struct grpc_stream grpc_stream
Definition: transport.h:174
channel_args.h
api_trace.h
do_nothing
static void do_nothing(void *, grpc_error_handle)
Definition: dns_resolver_cooldown_test.cc:147
GRPC_STATUS_INTERNAL
@ GRPC_STATUS_INTERNAL
Definition: include/grpc/impl/codegen/status.h:129
grpc_core::Timestamp::InfFuture
static constexpr Timestamp InfFuture()
Definition: src/core/lib/gprpp/time.h:79
flags
uint32_t flags
Definition: retry_filter.cc:632
GRPC_CHANNEL_SHUTDOWN
@ GRPC_CHANNEL_SHUTDOWN
Definition: include/grpc/impl/codegen/connectivity_state.h:40
gpr_ref_init
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
Definition: sync.cc:86
transport_impl.h
iomgr_fwd.h
endpoint.h
gpr_unref
GPRAPI int gpr_unref(gpr_refcount *r)
Definition: sync.cc:103
grpc_error
Definition: error_internal.h:42
destroy_stream
static void destroy_stream(grpc_transport *, grpc_stream *gs, grpc_closure *then_schedule_closure)
Definition: binder_transport.cc:646
grpc_core::CppImplOf< Server, grpc_server >::FromC
static Server * FromC(grpc_server *c_type)
Definition: cpp_impl_of.h:30
grpc_arg::type
grpc_arg_type type
Definition: grpc_types.h:104
grpc_metadata_batch
Definition: metadata_batch.h:1259
grpc_pollset
Definition: bm_cq_multiple_threads.cc:37
grpc_core::ConnectivityStateTracker
Definition: src/core/lib/transport/connectivity_state.h:97
set_pollset
static void set_pollset(grpc_transport *gt, grpc_stream *gs, grpc_pollset *gp)
Definition: binder_transport.cc:128
grpc_transport_stream_op_batch
Definition: transport.h:284
sync.h
grpc_closure
Definition: closure.h:56
cancel_stream_locked
static void cancel_stream_locked(grpc_binder_transport *gbt, grpc_binder_stream *gbs, grpc_error_handle error)
Definition: binder_transport.cc:149
op
static grpc_op * op
Definition: test/core/fling/client.cc:47
slice_buffer.h
GRPC_ARG_MAX_CONNECTION_IDLE_MS
#define GRPC_ARG_MAX_CONNECTION_IDLE_MS
Definition: grpc_types.h:163
close_transport_locked
static void close_transport_locked(grpc_binder_transport *gbt)
Definition: binder_transport.cc:578
grpc_endpoint
Definition: endpoint.h:105
gpr_ref
GPRAPI void gpr_ref(gpr_refcount *r)
Definition: sync.cc:88
grpc_core::ChannelArgsPreconditioning::PreconditionChannelArgs
ChannelArgs PreconditionChannelArgs(const grpc_channel_args *args) const
Definition: channel_args_preconditioning.cc:34
if
if(p->owned &&p->wrapped !=NULL)
Definition: call.c:42
grpc_core::Channel::Create
static absl::StatusOr< RefCountedPtr< Channel > > Create(const char *target, ChannelArgs args, grpc_channel_stack_type channel_stack_type, grpc_transport *optional_transport)
Definition: channel.cc:202
grpc_inproc_channel_create
grpc_channel * grpc_inproc_channel_create(grpc_server *server, const grpc_channel_args *args, void *)
Definition: inproc_transport.cc:1247
grpc_channel_args_copy_and_add
grpc_channel_args * grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add)
Definition: channel_args.cc:224
grpc_stream_refcount
Definition: transport.h:178
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
GRPC_API_TRACE
#define GRPC_API_TRACE(fmt, nargs, args)
Definition: api_trace.h:48
absl::exchange
T exchange(T &obj, U &&new_value)
Definition: abseil-cpp/absl/utility/utility.h:314
GRPC_ERROR_IS_NONE
#define GRPC_ERROR_IS_NONE(err)
Definition: error.h:241
channel.h
port_platform.h


grpc
Author(s):
autogenerated on Fri May 16 2025 02:59:06