23 #ifdef GRPC_CFSTREAM_ENDPOINT
25 #import <CoreFoundation/CoreFoundation.h>
44 struct CFStreamEndpoint {
48 CFReadStreamRef read_stream;
49 CFWriteStreamRef write_stream;
50 CFStreamHandle* stream_sync;
63 static void CFStreamFree(CFStreamEndpoint* ep) {
64 CFRelease(ep->read_stream);
65 CFRelease(ep->write_stream);
66 CFSTREAM_HANDLE_UNREF(ep->stream_sync,
"free");
71 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
72 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
73 static void CFStreamUnref(CFStreamEndpoint* ep,
const char* reason,
78 "CFStream endpoint unref %p : %s %" PRIdPTR
" -> %" PRIdPTR, ep,
79 reason, val, val - 1);
85 static void CFStreamRef(CFStreamEndpoint* ep,
const char* reason,
90 "CFStream endpoint ref %p : %s %" PRIdPTR
" -> %" PRIdPTR, ep,
91 reason, val, val + 1);
96 #define EP_REF(ep, reason) CFStreamRef((ep))
97 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
98 static void CFStreamUnref(CFStreamEndpoint* ep) {
103 static void CFStreamRef(CFStreamEndpoint* ep) {
gpr_ref(&ep->refcount); }
107 CFStreamEndpoint* ep) {
117 ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
121 for (
i = 0;
i < ep->read_slices->count;
i++) {
130 ep->read_cb =
nullptr;
131 ep->read_slices =
nullptr;
138 ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
142 ep->write_cb =
nullptr;
143 ep->write_slices =
nullptr;
148 CFStreamEndpoint* ep =
static_cast<CFStreamEndpoint*
>(
arg);
153 EP_UNREF(ep,
"read");
164 CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
165 if (stream_error !=
nullptr) {
166 error = CFStreamAnnotateError(
167 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error,
"Read error"), ep);
168 CFRelease(stream_error);
172 CallReadCb(ep,
error);
173 EP_UNREF(ep,
"read");
177 CFStreamAnnotateError(
179 EP_UNREF(ep,
"read");
185 EP_UNREF(ep,
"read");
190 CFStreamEndpoint* ep =
static_cast<CFStreamEndpoint*
>(
arg);
195 EP_UNREF(ep,
"write");
201 CFIndex write_size = CFWriteStreamWrite(
203 if (write_size == -1) {
205 CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
206 if (stream_error !=
nullptr) {
207 error = CFStreamAnnotateError(
208 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error,
"write failed."), ep);
209 CFRelease(stream_error);
213 CallWriteCb(ep,
error);
214 EP_UNREF(ep,
"write");
220 if (ep->write_slices->length > 0) {
221 ep->stream_sync->NotifyOnWrite(&ep->write_action);
224 EP_UNREF(ep,
"write");
242 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
244 gpr_log(
GPR_DEBUG,
"CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
248 ep_impl->read_cb =
cb;
249 ep_impl->read_slices =
slices;
253 EP_REF(ep_impl,
"read");
254 ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
259 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
265 ep_impl->write_cb =
cb;
266 ep_impl->write_slices =
slices;
267 EP_REF(ep_impl,
"write");
268 ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
272 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
277 CFReadStreamClose(ep_impl->read_stream);
278 CFWriteStreamClose(ep_impl->write_stream);
279 ep_impl->stream_sync->Shutdown(why);
287 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
291 EP_UNREF(ep_impl,
"destroy");
295 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
296 return ep_impl->peer_string;
300 CFStreamEndpoint* ep_impl =
reinterpret_cast<CFStreamEndpoint*
>(ep);
301 return ep_impl->local_address;
306 bool CFStreamCanTrackErr(
grpc_endpoint* ep) {
return false; }
315 CFStreamAddToPollset,
316 CFStreamAddToPollsetSet,
317 CFStreamDeleteFromPollsetSet,
321 CFStreamGetLocalAddress,
323 CFStreamCanTrackErr};
325 grpc_endpoint* grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,
326 CFWriteStreamRef write_stream,
327 const char* peer_string,
328 CFStreamHandle* stream_sync) {
329 CFStreamEndpoint* ep_impl =
new CFStreamEndpoint;
332 "CFStream endpoint:%p create readStream:%p writeStream: %p",
333 ep_impl, read_stream, write_stream);
335 ep_impl->base.vtable = &
vtable;
337 ep_impl->read_stream = read_stream;
338 ep_impl->write_stream = write_stream;
339 CFRetain(read_stream);
340 CFRetain(write_stream);
341 ep_impl->stream_sync = stream_sync;
342 CFSTREAM_HANDLE_REF(ep_impl->stream_sync,
"endpoint create");
344 ep_impl->peer_string = peer_string;
346 resolved_local_addr.
len =
sizeof(resolved_local_addr.
addr);
347 CFDataRef native_handle =
static_cast<CFDataRef
>(CFReadStreamCopyProperty(
348 ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
349 CFSocketNativeHandle sockfd;
350 CFDataGetBytes(native_handle, CFRangeMake(0,
sizeof(CFSocketNativeHandle)),
353 CFRelease(native_handle);
356 if (getsockname(sockfd,
reinterpret_cast<sockaddr*
>(resolved_local_addr.
addr),
357 &resolved_local_addr.
len) < 0 ||
359 ep_impl->local_address =
"";
361 ep_impl->local_address = addr_uri.
value();
363 ep_impl->read_cb = nil;
364 ep_impl->write_cb = nil;
365 ep_impl->read_slices = nil;
366 ep_impl->write_slices = nil;
368 static_cast<void*
>(ep_impl), grpc_schedule_on_exec_ctx);
370 static_cast<void*
>(ep_impl), grpc_schedule_on_exec_ctx);
372 return &ep_impl->base;