cfstream_handle.cc
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
20 
23 
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/sync.h>
30 
37 
39 
40 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
41 
42 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
43 
44 void* CFStreamHandle::Retain(void* info) {
45  CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
46  CFSTREAM_HANDLE_REF(handle, "retain");
47  return info;
48 }
49 
50 void CFStreamHandle::Release(void* info) {
51  CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
52  CFSTREAM_HANDLE_UNREF(handle, "release");
53 }
54 
55 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
56  CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
57  return new CFStreamHandle(read_stream, write_stream);
58 }
59 
60 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
61  CFStreamEventType type,
62  void* client_callback_info) {
63  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
66  CFErrorRef stream_error;
67  CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
68  if (grpc_tcp_trace.enabled()) {
69  gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
70  stream, type, client_callback_info);
71  }
72  switch (type) {
73  case kCFStreamEventOpenCompleted:
74  handle->open_event_.SetReady();
75  break;
76  case kCFStreamEventHasBytesAvailable:
77  case kCFStreamEventEndEncountered:
78  handle->read_event_.SetReady();
79  break;
80  case kCFStreamEventErrorOccurred:
81  stream_error = CFReadStreamCopyError(stream);
83  GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
85  CFRelease(stream_error);
86  handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
87  handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
88  handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
90  break;
91  default:
92  GPR_UNREACHABLE_CODE(return );
93  }
94 }
95 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
96  CFStreamEventType type,
97  void* clientCallBackInfo) {
98  grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
101  CFErrorRef stream_error;
102  CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
103  if (grpc_tcp_trace.enabled()) {
104  gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
105  stream, type, clientCallBackInfo);
106  }
107  switch (type) {
108  case kCFStreamEventOpenCompleted:
109  handle->open_event_.SetReady();
110  break;
111  case kCFStreamEventCanAcceptBytes:
112  case kCFStreamEventEndEncountered:
113  handle->write_event_.SetReady();
114  break;
115  case kCFStreamEventErrorOccurred:
116  stream_error = CFWriteStreamCopyError(stream);
118  GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
120  CFRelease(stream_error);
121  handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
122  handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
123  handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
125  break;
126  default:
127  GPR_UNREACHABLE_CODE(return );
128  }
129 }
130 
131 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
132  CFWriteStreamRef write_stream) {
133  gpr_ref_init(&refcount_, 1);
134  open_event_.InitEvent();
135  read_event_.InitEvent();
136  write_event_.InitEvent();
137  dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
138  CFStreamClientContext ctx = {0, static_cast<void*>(this),
139  CFStreamHandle::Retain, CFStreamHandle::Release,
140  nil};
141  CFReadStreamSetClient(
142  read_stream,
143  kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
144  kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
145  CFStreamHandle::ReadCallback, &ctx);
146  CFWriteStreamSetClient(
147  write_stream,
148  kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
149  kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
150  CFStreamHandle::WriteCallback, &ctx);
151  grpc_apple_register_read_stream(read_stream, dispatch_queue_);
152  grpc_apple_register_write_stream(write_stream, dispatch_queue_);
153 }
154 
155 CFStreamHandle::~CFStreamHandle() {
156  open_event_.DestroyEvent();
157  read_event_.DestroyEvent();
158  write_event_.DestroyEvent();
159  dispatch_release(dispatch_queue_);
160 }
161 
162 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
163  open_event_.NotifyOn(closure);
164 }
165 
166 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
167  read_event_.NotifyOn(closure);
168 }
169 
170 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
171  write_event_.NotifyOn(closure);
172 }
173 
175  open_event_.SetShutdown(GRPC_ERROR_REF(error));
176  read_event_.SetShutdown(GRPC_ERROR_REF(error));
177  write_event_.SetShutdown(GRPC_ERROR_REF(error));
179 }
180 
181 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
182  if (grpc_tcp_trace.enabled()) {
183  gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
185  "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
186  reason, val, val + 1);
187  }
188  gpr_ref(&refcount_);
189 }
190 
191 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
192  if (grpc_tcp_trace.enabled()) {
193  gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
195  "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
196  reason, val, val - 1);
197  }
198  if (gpr_unref(&refcount_)) {
199  delete this;
200  }
201 }
202 
203 #else
204 
205 /* Creating a phony function so that the grpc_cfstream library will be
206  * non-empty.
207  */
208 void CFStreamPhony() {}
209 
210 #endif
trace.h
gpr_atm_no_barrier_load
#define gpr_atm_no_barrier_load(p)
Definition: impl/codegen/atm_gcc_atomic.h:53
GRPC_STATUS_UNAVAILABLE
@ GRPC_STATUS_UNAVAILABLE
Definition: include/grpc/impl/codegen/status.h:143
ctx
Definition: benchmark-async.c:30
error
grpc_error_handle error
Definition: retry_filter.cc:499
file
Definition: bloaty/third_party/zlib/examples/gzappend.c:170
closure.h
grpc_core::ApplicationCallbackExecCtx
Definition: exec_ctx.h:283
GPR_LOG_SEVERITY_DEBUG
@ GPR_LOG_SEVERITY_DEBUG
Definition: include/grpc/impl/codegen/log.h:46
memory.h
grpc_tcp_trace
grpc_core::TraceFlag grpc_tcp_trace(false, "tcp")
gpr_log
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
grpc.h
error_cfstream.h
cfstream_handle.h
benchmark::Shutdown
void Shutdown()
Definition: benchmark/src/benchmark.cc:607
GPR_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)
Definition: impl/codegen/port_platform.h:652
grpc_core::ExecCtx
Definition: exec_ctx.h:97
grpc_core::TraceFlag
Definition: debug/trace.h:63
grpc_core::TraceFlag::enabled
bool enabled()
Definition: debug/trace.h:82
gpr_atm
intptr_t gpr_atm
Definition: impl/codegen/atm_gcc_atomic.h:32
GRPC_ERROR_REF
#define GRPC_ERROR_REF(err)
Definition: error.h:261
grpc_error_set_int
grpc_error_handle grpc_error_set_int(grpc_error_handle src, grpc_error_ints which, intptr_t value)
Definition: error.cc:613
port.h
exec_ctx
grpc_core::ExecCtx exec_ctx
Definition: end2end_binder_transport_test.cc:75
regen-readme.line
line
Definition: regen-readme.py:30
exec_ctx.h
closure
Definition: proxy.cc:59
GRPC_ERROR_UNREF
#define GRPC_ERROR_UNREF(err)
Definition: error.h:262
handle
static csh handle
Definition: test_arm_regression.c:16
GPR_DEBUG
#define GPR_DEBUG
Definition: include/grpc/impl/codegen/log.h:55
gpr_ref_init
GPRAPI void gpr_ref_init(gpr_refcount *r, int n)
Definition: sync.cc:86
atm.h
asyncio_get_stats.type
type
Definition: asyncio_get_stats.py:37
CFStreamPhony
void CFStreamPhony()
Definition: cfstream_handle.cc:208
gpr_unref
GPRAPI int gpr_unref(gpr_refcount *r)
Definition: sync.cc:103
grpc_init
GRPCAPI void grpc_init(void)
Definition: init.cc:146
grpc_error
Definition: error_internal.h:42
testing::Ref
internal::RefMatcher< T & > Ref(T &x)
Definition: cares/cares/test/gmock-1.8.0/gmock/gmock.h:8628
sync.h
grpc_closure
Definition: closure.h:56
grpc_shutdown
GRPCAPI void grpc_shutdown(void)
Definition: init.cc:209
gpr_ref
GPRAPI void gpr_ref(gpr_refcount *r)
Definition: sync.cc:88
ev_apple.h
GRPC_ERROR_INT_GRPC_STATUS
@ GRPC_ERROR_INT_GRPC_STATUS
grpc status code representing this error
Definition: error.h:66
port_platform.h
stream
voidpf stream
Definition: bloaty/third_party/zlib/contrib/minizip/ioapi.h:136


grpc
Author(s):
autogenerated on Thu Mar 13 2025 02:58:43