pipe_test.cc
Go to the documentation of this file.
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <memory>
18 #include <tuple>
19 #include <utility>
20 
21 #include "absl/memory/memory.h"
22 #include "absl/status/status.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 
27 
37 
40 
41 namespace grpc_core {
42 
44  ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
45 
46 TEST(PipeTest, CanSendAndReceive) {
47  StrictMock<MockFunction<void(absl::Status)>> on_done;
48  EXPECT_CALL(on_done, Call(absl::OkStatus()));
50  [] {
51  Pipe<int> pipe;
52  return Seq(
53  // Concurrently: send 42 into the pipe, and receive from the pipe.
54  Join(pipe.sender.Push(42), pipe.receiver.Next()),
55  // Once complete, verify successful sending and the received value
56  // is 42.
57  [](std::tuple<bool, absl::optional<int>> result) {
58  EXPECT_EQ(result, std::make_tuple(true, absl::optional<int>(42)));
59  return absl::OkStatus();
60  });
61  },
63  [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
65 }
66 
67 TEST(PipeTest, CanReceiveAndSend) {
68  StrictMock<MockFunction<void(absl::Status)>> on_done;
69  EXPECT_CALL(on_done, Call(absl::OkStatus()));
71  [] {
72  Pipe<int> pipe;
73  return Seq(
74  // Concurrently: receive from the pipe, and send 42 into the pipe.
75  Join(pipe.receiver.Next(), pipe.sender.Push(42)),
76  // Once complete, verify the received value is 42 and successful
77  // sending.
78  [](std::tuple<absl::optional<int>, bool> result) {
79  EXPECT_EQ(result, std::make_tuple(absl::optional<int>(42), true));
80  return absl::OkStatus();
81  });
82  },
84  [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
86 }
87 
88 TEST(PipeTest, CanSeeClosedOnSend) {
89  StrictMock<MockFunction<void(absl::Status)>> on_done;
90  EXPECT_CALL(on_done, Call(absl::OkStatus()));
92  [] {
93  Pipe<int> pipe;
94  auto sender = std::move(pipe.sender);
95  // Push 42 onto the pipe - this will the pipe's one-deep send buffer.
96  EXPECT_TRUE(NowOrNever(sender.Push(42)).has_value());
97  auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
98  absl::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
99  return Seq(
100  // Concurrently:
101  // - push 43 into the sender, which will stall because the buffer is
102  // full
103  // - and close the receiver, which will fail the pending send.
104  Join(sender.Push(43),
105  [receiver] {
106  receiver->reset();
107  return absl::OkStatus();
108  }),
109  // Verify both that the send failed and that we executed the close.
110  [](std::tuple<bool, absl::Status> result) {
111  EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
112  return absl::OkStatus();
113  });
114  },
116  [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
118 }
119 
120 TEST(PipeTest, CanSeeClosedOnReceive) {
121  StrictMock<MockFunction<void(absl::Status)>> on_done;
122  EXPECT_CALL(on_done, Call(absl::OkStatus()));
123  MakeActivity(
124  [] {
125  Pipe<int> pipe;
126  auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
127  absl::make_unique<PipeSender<int>>(std::move(pipe.sender)));
128  auto receiver = std::move(pipe.receiver);
129  return Seq(
130  // Concurrently:
131  // - wait for a received value (will stall forever since we push
132  // nothing into the queue)
133  // - close the sender, which will signal the receiver to return an
134  // end-of-stream.
135  Join(receiver.Next(),
136  [sender] {
137  sender->reset();
138  return absl::OkStatus();
139  }),
140  // Verify we received end-of-stream and closed the sender.
141  [](std::tuple<absl::optional<int>, absl::Status> result) {
142  EXPECT_EQ(result, std::make_tuple(absl::optional<int>(),
143  absl::OkStatus()));
144  return absl::OkStatus();
145  });
146  },
148  [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
150 }
151 
152 } // namespace grpc_core
153 
154 int main(int argc, char** argv) {
155  ::testing::InitGoogleTest(&argc, argv);
156  return RUN_ALL_TESTS();
157 }
testing::StrictMock
Definition: bloaty/third_party/googletest/googlemock/include/gmock/gmock-nice-strict.h:148
_gevent_test_main.result
result
Definition: _gevent_test_main.py:96
grpc_core::MakeScopedArena
ScopedArenaPtr MakeScopedArena(size_t initial_size, MemoryAllocator *memory_allocator)
Definition: src/core/lib/resource_quota/arena.h:130
grpc_event_engine::experimental::MemoryAllocator
Definition: memory_allocator.h:35
grpc_core
Definition: call_metric_recorder.h:31
absl::OkStatus
Status OkStatus()
Definition: third_party/abseil-cpp/absl/status/status.h:882
grpc_core::TEST
TEST(AvlTest, NoOp)
Definition: avl_test.cc:21
grpc_core::NowOrNever
auto NowOrNever(Promise promise) -> absl::optional< typename promise_detail::PromiseLike< Promise >::Result >
Definition: promise/promise.h:42
status
absl::Status status
Definition: rls.cc:251
seq.h
grpc_core::Pipe::receiver
PipeReceiver< T > receiver
Definition: pipe.h:315
grpc_core::MakeActivity
ActivityPtr MakeActivity(Factory promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts &&... contexts)
Definition: activity.h:522
absl::move
constexpr absl::remove_reference_t< T > && move(T &&t) noexcept
Definition: abseil-cpp/absl/utility/utility.h:221
pipe.h
grpc_core::Pipe::sender
PipeSender< T > sender
Definition: pipe.h:314
absl::optional
Definition: abseil-cpp/absl/types/internal/optional.h:61
grpc_core::g_memory_allocator
static auto * g_memory_allocator
Definition: call_finalization_test.cc:24
grpc_core::ResourceQuota::Default
static ResourceQuotaRefPtr Default()
Definition: resource_quota.cc:27
RUN_ALL_TESTS
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:2471
main
int main(int argc, char **argv)
Definition: pipe_test.cc:154
resource_quota.h
promise.h
EXPECT_CALL
#define EXPECT_CALL(obj, call)
test_wakeup_schedulers.h
join.h
testing::InitGoogleTest
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: bloaty/third_party/googletest/googletest/src/gtest.cc:6106
grpc_core::Pipe
Definition: pipe.h:34
tests.unit._server_ssl_cert_config_test.Call
Call
Definition: _server_ssl_cert_config_test.py:70
absl::Status
Definition: third_party/abseil-cpp/absl/status/status.h:424
testing::MockFunction
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:11854
ref_counted_ptr.h
memory_quota.h
EXPECT_TRUE
#define EXPECT_TRUE(condition)
Definition: bloaty/third_party/googletest/googletest/include/gtest/gtest.h:1967
grpc_core::NoWakeupScheduler
Definition: test_wakeup_schedulers.h:28
memory_allocator.h
grpc_core::Seq
promise_detail::Seq< Functors... > Seq(Functors... functors)
Definition: seq.h:62
activity.h
basic_seq.h
grpc_core::Join
promise_detail::Join< Promise... > Join(Promise... promises)
Definition: join.h:49


grpc
Author(s):
autogenerated on Thu Mar 13 2025 03:00:52