caching_interceptor.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <map>
20 
22 
23 #ifdef BAZEL_BUILD
24 #include "examples/protos/keyvaluestore.grpc.pb.h"
25 #else
26 #include "keyvaluestore.grpc.pb.h"
27 #endif
28 
29 // This is a naive implementation of a cache. A new cache is for each call. For
30 // each new key request, the key is first searched in the map and if found, the
31 // interceptor fills in the return value without making a request to the server.
32 // Only if the key is not found in the cache do we make a request.
34  public:
36 
37  void Intercept(
38  ::grpc::experimental::InterceptorBatchMethods* methods) override {
39  bool hijack = false;
40  if (methods->QueryInterceptionHookPoint(
42  PRE_SEND_INITIAL_METADATA)) {
43  // Hijack all calls
44  hijack = true;
45  // Create a stream on which this interceptor can make requests
46  stub_ = keyvaluestore::KeyValueStore::NewStub(
47  methods->GetInterceptedChannel());
48  stream_ = stub_->GetValues(&context_);
49  }
50  if (methods->QueryInterceptionHookPoint(
52  // We know that clients perform a Read and a Write in a loop, so we don't
53  // need to maintain a list of the responses.
54  std::string requested_key;
55  const keyvaluestore::Request* req_msg =
56  static_cast<const keyvaluestore::Request*>(methods->GetSendMessage());
57  if (req_msg != nullptr) {
58  requested_key = req_msg->key();
59  } else {
60  // The non-serialized form would not be available in certain scenarios,
61  // so add a fallback
62  keyvaluestore::Request req_msg;
63  auto* buffer = methods->GetSerializedSendMessage();
64  auto copied_buffer = *buffer;
65  GPR_ASSERT(
67  &copied_buffer, &req_msg)
68  .ok());
69  requested_key = req_msg.key();
70  }
71 
72  // Check if the key is present in the map
73  auto search = cached_map_.find(requested_key);
74  if (search != cached_map_.end()) {
75  std::cout << "Key " << requested_key << "found in map";
76  response_ = search->second;
77  } else {
78  std::cout << "Key " << requested_key << "not found in cache";
79  // Key was not found in the cache, so make a request
81  req.set_key(requested_key);
82  stream_->Write(req);
84  stream_->Read(&resp);
85  response_ = resp.value();
86  // Insert the pair in the cache for future requests
87  cached_map_.insert({requested_key, response_});
88  }
89  }
90  if (methods->QueryInterceptionHookPoint(
92  stream_->WritesDone();
93  }
94  if (methods->QueryInterceptionHookPoint(
97  static_cast<keyvaluestore::Response*>(methods->GetRecvMessage());
98  resp->set_value(response_);
99  }
100  if (methods->QueryInterceptionHookPoint(
102  auto* status = methods->GetRecvStatus();
104  }
105  // One of Hijack or Proceed always needs to be called to make progress.
106  if (hijack) {
107  // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in
108  // the hook points
109  methods->Hijack();
110  } else {
111  // Proceed is an indicator that the interceptor is done intercepting the
112  // batch.
113  methods->Proceed();
114  }
115  }
116 
117  private:
119  std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_;
120  std::unique_ptr<
123  std::map<std::string, std::string> cached_map_;
125 };
126 
129  public:
131  grpc::experimental::ClientRpcInfo* info) override {
132  return new CachingInterceptor(info);
133  }
134 };
CachingInterceptorFactory
Definition: caching_interceptor.h:127
CachingInterceptor::context_
grpc::ClientContext context_
Definition: caching_interceptor.h:118
grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE
@ PRE_SEND_CLOSE
CachingInterceptor::Intercept
void Intercept(::grpc::experimental::InterceptorBatchMethods *methods) override
Definition: caching_interceptor.h:37
grpc::experimental::InterceptorBatchMethods::GetRecvMessage
virtual void * GetRecvMessage()=0
grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE
@ PRE_RECV_MESSAGE
grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS
@ PRE_RECV_STATUS
grpc::experimental::ClientInterceptorFactoryInterface
Definition: impl/codegen/client_interceptor.h:48
testing::internal::string
::std::string string
Definition: bloaty/third_party/protobuf/third_party/googletest/googletest/include/gtest/internal/gtest-port.h:881
CachingInterceptor::stub_
std::unique_ptr< keyvaluestore::KeyValueStore::Stub > stub_
Definition: caching_interceptor.h:119
status
absl::Status status
Definition: rls.cc:251
grpc::experimental::InterceptorBatchMethods::QueryInterceptionHookPoint
virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type)=0
CachingInterceptor
Definition: caching_interceptor.h:33
grpc::experimental::ClientRpcInfo
Definition: impl/codegen/client_interceptor.h:68
CachingInterceptor::stream_
std::unique_ptr< grpc::ClientReaderWriter< keyvaluestore::Request, keyvaluestore::Response > > stream_
Definition: caching_interceptor.h:122
grpc::SerializationTraits
Definition: grpcpp/impl/codegen/serialization_traits.h:60
CachingInterceptor::CachingInterceptor
CachingInterceptor(grpc::experimental::ClientRpcInfo *info)
Definition: caching_interceptor.h:35
search
Definition: search.py:1
client_interceptor.h
grpc::ClientReaderWriter
Definition: grpcpp/impl/codegen/channel_interface.h:35
GPR_ASSERT
#define GPR_ASSERT(x)
Definition: include/grpc/impl/codegen/log.h:94
req
static uv_connect_t req
Definition: test-connection-fail.c:30
http2_server_health_check.resp
resp
Definition: http2_server_health_check.py:31
grpc::Status::OK
static const Status & OK
An OK pre-defined instance.
Definition: include/grpcpp/impl/codegen/status.h:113
buffer
char buffer[1024]
Definition: libuv/docs/code/idle-compute/main.c:8
grpc::experimental::InterceptorBatchMethods
Definition: impl/codegen/interceptor.h:98
grpc::experimental::InterceptorBatchMethods::Hijack
virtual void Hijack()=0
grpc::ClientContext
Definition: grpcpp/impl/codegen/client_context.h:195
grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE
@ PRE_SEND_MESSAGE
grpc::experimental::InterceptorBatchMethods::GetSerializedSendMessage
virtual ByteBuffer * GetSerializedSendMessage()=0
CachingInterceptor::response_
std::string response_
Definition: caching_interceptor.h:124
grpc::experimental::Interceptor
Definition: impl/codegen/interceptor.h:221
ok
bool ok
Definition: async_end2end_test.cc:197
CachingInterceptorFactory::CreateClientInterceptor
grpc::experimental::Interceptor * CreateClientInterceptor(grpc::experimental::ClientRpcInfo *info) override
Definition: caching_interceptor.h:130
grpc::experimental::InterceptorBatchMethods::GetInterceptedChannel
virtual std::unique_ptr< ChannelInterface > GetInterceptedChannel()=0
demo_pb2.Request
Request
Definition: demo_pb2.py:108
CachingInterceptor::cached_map_
std::map< std::string, std::string > cached_map_
Definition: caching_interceptor.h:123
grpc::experimental::InterceptorBatchMethods::GetSendMessage
virtual const void * GetSendMessage()=0
grpc::experimental::InterceptorBatchMethods::Proceed
virtual void Proceed()=0
demo_pb2.Response
Response
Definition: demo_pb2.py:115
grpc::experimental::InterceptionHookPoints
InterceptionHookPoints
Definition: impl/codegen/interceptor.h:59
grpc::experimental::InterceptorBatchMethods::GetRecvStatus
virtual Status * GetRecvStatus()=0


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:42