Go to the documentation of this file.
27 #include <gmock/gmock.h>
29 #include "absl/synchronization/notification.h"
49 void*
Tag(
intptr_t t) {
return reinterpret_cast<void*
>(
t); }
52 class TrailingMetadataRecordingFilter {
56 static bool trailing_metadata_available() {
60 static void reset_trailing_metadata_available() {
65 stream_network_state() {
69 static void reset_stream_network_state() {
73 static void reset_state() {
74 reset_trailing_metadata_available();
75 reset_stream_network_state();
83 new (
elem->call_data) CallData(
args);
90 auto* calld =
static_cast<CallData*
>(
elem->call_data);
94 static void StartTransportStreamOpBatch(
96 auto* calld =
static_cast<CallData*
>(
elem->call_data);
98 calld->trailing_metadata_available_ =
100 calld->original_recv_initial_metadata_ready_ =
103 &calld->recv_initial_metadata_ready_;
106 calld->recv_trailing_metadata_ =
108 calld->original_recv_trailing_metadata_ready_ =
111 &calld->recv_trailing_metadata_ready_;
121 RecvTrailingMetadataReady,
this,
nullptr);
125 auto* calld =
static_cast<CallData*
>(
arg);
127 *calld->trailing_metadata_available_;
133 auto* calld =
static_cast<CallData*
>(
arg);
135 calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
137 calld->original_recv_trailing_metadata_ready_,
151 new (
elem->channel_data) TrailingMetadataRecordingFilter();
157 static_cast<TrailingMetadataRecordingFilter*
>(
elem->channel_data);
158 chand->~TrailingMetadataRecordingFilter();
167 CallData::StartTransportStreamOpBatch,
174 sizeof(TrailingMetadataRecordingFilter),
179 "trailing-metadata-recording-filter",
187 explicit StreamsNotSeenTest(
bool server_allows_streams =
true)
190 TrailingMetadataRecordingFilter::reset_state();
217 creds, &client_channel_args);
234 ~StreamsNotSeenTest()
override {
260 StreamsNotSeenTest*
self =
static_cast<StreamsNotSeenTest*
>(
arg);
268 if (
self->server_allows_streams_) {
269 constexpr char kHttp2SettingsFrame[] =
270 "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
271 self->Write(absl::string_view(kHttp2SettingsFrame,
272 sizeof(kHttp2SettingsFrame) - 1));
275 constexpr char kHttp2SettingsFrame[] =
276 "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00";
277 self->Write(absl::string_view(kHttp2SettingsFrame,
278 sizeof(kHttp2SettingsFrame) - 1));
280 self->connect_notification_.Notify();
299 const char ping_bytes[] =
300 "\x00\x00\x08\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
301 const char ping_ack_bytes[] =
302 "\x00\x00\x08\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
308 void SendGoaway(
uint32_t last_stream_id) {
319 &on_write_done_notification_,
nullptr);
331 on_write_done_notification_->
Notify();
335 StreamsNotSeenTest*
self =
static_cast<StreamsNotSeenTest*
>(
arg);
339 for (
size_t i = 0;
i <
self->read_buffer_.count; ++
i) {
343 self->read_cv_.SignalAll();
350 self->read_end_notification_.Notify();
356 std::atomic<bool>
done{
false};
400 TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
413 const char* error_string;
457 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
459 TrailingMetadataRecordingFilter::stream_network_state().has_value());
460 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().
value(),
463 gpr_free(
const_cast<char*
>(error_string));
474 TEST_F(StreamsNotSeenTest, TransportDestroyed) {
487 const char* error_string;
532 TrailingMetadataRecordingFilter::stream_network_state().has_value());
534 gpr_free(
const_cast<char*
>(error_string));
544 TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
562 const char* error_string;
596 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
598 TrailingMetadataRecordingFilter::stream_network_state().has_value());
599 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().
value(),
602 gpr_free(
const_cast<char*
>(error_string));
613 class ZeroConcurrencyTest :
public StreamsNotSeenTest {
615 ZeroConcurrencyTest() : StreamsNotSeenTest(
false) {}
622 TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
635 const char* error_string;
674 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
676 TrailingMetadataRecordingFilter::stream_network_state().has_value());
677 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().
value(),
680 gpr_free(
const_cast<char*
>(error_string));
690 TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
703 const char* error_string;
740 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
742 TrailingMetadataRecordingFilter::stream_network_state().has_value());
743 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().
value(),
746 gpr_free(
const_cast<char*
>(error_string));
756 int main(
int argc,
char** argv) {
765 builder->channel_init()->RegisterStage(
772 auto it =
builder->mutable_stack()->end();
774 builder->mutable_stack()->insert(
it, filter);
#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler)
grpc_closure recv_initial_metadata_ready_
#define EXPECT_FALSE(condition)
GPRAPI void grpc_slice_unref(grpc_slice s)
grpc_slice_buffer read_buffer_
void test_tcp_server_init(test_tcp_server *server, grpc_tcp_server_cb on_connect, void *user_data)
GPRAPI void grpc_slice_buffer_destroy(grpc_slice_buffer *sb)
grpc_metadata_array * trailing_metadata
gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s)
grpc_status_code * status
@ GRPC_STATUS_UNAVAILABLE
std::unique_ptr< std::thread > server_poll_thread_
void test_tcp_server_start(test_tcp_server *server, int port)
void StrAppend(std::string *dest, const AlphaNum &a)
return memset(p, 0, total)
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op)
int main(int argc, char **argv)
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(grpc_channel *channel, int try_to_connect)
absl::string_view StringViewFromSlice(const grpc_slice &slice)
GPRAPI void gpr_free(void *ptr)
void grpc_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, bool urgent, int min_progress_size)
#define ABSL_GUARDED_BY(x)
static int Init(CMessage *self, PyObject *args, PyObject *kwargs)
static grpc_channel_filter kFilterVtable
void grpc_call_stack_ignore_set_pollset_or_pollset_set(grpc_call_element *, grpc_polling_entity *)
union grpc_op::grpc_op_data data
void test_tcp_server_destroy(test_tcp_server *server)
GRPCAPI void grpc_metadata_array_destroy(grpc_metadata_array *array)
grpc_closure on_write_done_
#define GRPC_ARG_ENABLE_RETRIES
static grpc_metadata_array trailing_metadata_recv
void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, const grpc_slice &debug_data, grpc_slice_buffer *slice_buffer)
grpc_completion_queue * cq_
@ GRPC_OP_RECV_INITIAL_METADATA
grpc_closure recv_trailing_metadata_ready_
grpc_closure * original_recv_trailing_metadata_ready_
def c_str(s, encoding='ascii')
void grpc_channel_stack_no_post_init(grpc_channel_stack *, grpc_channel_element *)
GRPCAPI void grpc_call_unref(grpc_call *call)
const char ** error_string
gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms)
absl::Notification connect_notification_
void cq_verifier_destroy(cq_verifier *v)
GPRAPI void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer *sb)
struct grpc_call grpc_call
std::string JoinHostPort(absl::string_view host, int port)
void grpc_channel_next_get_info(grpc_channel_element *elem, const grpc_channel_info *channel_info)
GRPCAPI grpc_channel_credentials * grpc_insecure_credentials_create()
void test_tcp_server_poll(test_tcp_server *server, int milliseconds)
GPRAPI grpc_slice grpc_slice_from_static_string(const char *source)
cq_verifier * cq_verifier_create(grpc_completion_queue *cq)
grpc_transport_stream_op_batch_payload * payload
TEST_F(AuthorizationMatchersTest, AlwaysAuthorizationMatcher)
GPRAPI grpc_slice grpc_empty_slice(void)
ABSL_NAMESPACE_BEGIN bool StrContains(absl::string_view haystack, absl::string_view needle) noexcept
static absl::optional< GrpcStreamNetworkState::ValueType > stream_network_state_
grpc_metadata_batch * recv_trailing_metadata_
#define GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA
void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)
void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_error_handle why)
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op_batch *op)
grpc_transport_stream_op_batch * batch
#define CQ_EXPECT_COMPLETION(v, tag, success)
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
int grpc_pick_unused_port_or_die(void)
void grpc_endpoint_destroy(grpc_endpoint *ep)
GPRAPI void grpc_slice_buffer_init(grpc_slice_buffer *sb)
#define GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc)
grpc_slice * status_details
grpc_closure on_read_done_
static grpc_slice details
GRPCAPI void grpc_channel_credentials_release(grpc_channel_credentials *creds)
GRPCAPI grpc_call * grpc_channel_create_call(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved)
bool recv_initial_metadata
constexpr Duration Seconds(T n)
#define GPR_ARRAY_SIZE(array)
GPRAPI void grpc_slice_buffer_add(grpc_slice_buffer *sb, grpc_slice slice)
GRPCAPI grpc_channel * grpc_channel_create(const char *target, grpc_channel_credentials *creds, const grpc_channel_args *args)
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
#define GRPC_ERROR_REF(err)
#define GRPC_PROPAGATE_DEFAULTS
void Destroy(grpc_transport *)
int Write(int fd, const void *buf, unsigned int count)
static StaticSlice FromStaticBuffer(const void *s, size_t len)
grpc_arg grpc_channel_arg_integer_create(char *name, int value)
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq)
@ GRPC_OP_SEND_INITIAL_METADATA
grpc_core::ExecCtx exec_ctx
void BuildCoreConfiguration(CoreConfiguration::Builder *builder)
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
grpc_metadata_batch * recv_initial_metadata
#define GRPC_ARG_HTTP2_BDP_PROBE
GRPCAPI grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved)
void cq_verify(cq_verifier *v, int timeout_sec)
#define ASSERT_TRUE(condition)
GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
GRPCAPI void grpc_channel_destroy(grpc_channel *channel)
static void RunWithSpecialConfiguration(BuildFunc build_configuration, RunFunc code_to_run)
struct grpc_channel grpc_channel
bool server_allows_streams_
GRPCAPI void grpc_channel_watch_connectivity_state(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
bool recv_trailing_metadata
void grpc_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb, void *arg, int max_frame_size)
absl::Notification read_end_notification_
bool WaitForNotificationWithTimeout(absl::Duration timeout) const
GRPCAPI grpc_completion_queue * grpc_completion_queue_create_for_next(void *reserved)
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
grpc_metadata_batch * recv_trailing_metadata
grpc_completion_type type
GRPCAPI void grpc_init(void)
@ GRPC_OP_RECV_STATUS_ON_CLIENT
PHP_PROTO_OBJECT_FREE_END PHP_PROTO_OBJECT_DTOR_END intern self
std::atomic< bool > shutdown_
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset)
static grpc_metadata_array initial_metadata_recv
GRPCAPI grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)
GRPCAPI void grpc_shutdown(void)
static void Run(const DebugLocation &location, grpc_closure *closure, grpc_error_handle error)
static uv_thread_t thread
@ GRPC_OP_SEND_CLOSE_FROM_CLIENT
GRPCAPI void grpc_metadata_array_init(grpc_metadata_array *array)
bool * trailing_metadata_available_
#define GRPC_ERROR_IS_NONE(err)
grpc_closure * original_recv_initial_metadata_ready_
grpc
Author(s):
autogenerated on Fri May 16 2025 03:00:20