evhiperfifo.c
Go to the documentation of this file.
1 /***************************************************************************
2  * _ _ ____ _
3  * Project ___| | | | _ \| |
4  * / __| | | | |_) | |
5  * | (__| |_| | _ <| |___
6  * \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 1998 - 2017, Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.haxx.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  ***************************************************************************/
22 /* <DESC>
23  * multi socket interface together with libev
24  * </DESC>
25  */
26 /* Example application source code using the multi socket interface to
27  * download many files at once.
28  *
29  * This example features the same basic functionality as hiperfifo.c does,
30  * but this uses libev instead of libevent.
31  *
32  * Written by Jeff Pohlmeyer, converted to use libev by Markus Koetter
33 
34 Requires libev and a (POSIX?) system that has mkfifo().
35 
36 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
37 sample programs.
38 
39 When running, the program creates the named pipe "hiper.fifo"
40 
41 Whenever there is input into the fifo, the program reads the input as a list
42 of URL's and creates some new easy handles to fetch each URL via the
43 curl_multi "hiper" API.
44 
45 
46 Thus, you can try a single URL:
47  % echo http://www.yahoo.com > hiper.fifo
48 
49 Or a whole bunch of them:
50  % cat my-url-list > hiper.fifo
51 
52 The fifo buffer is handled almost instantly, so you can even add more URL's
53 while the previous requests are still being downloaded.
54 
55 Note:
56  For the sake of simplicity, URL length is limited to 1023 char's !
57 
58 This is purely a demo app, all retrieved data is simply discarded by the write
59 callback.
60 
61 */
62 
63 #include <stdio.h>
64 #include <string.h>
65 #include <stdlib.h>
66 #include <sys/time.h>
67 #include <time.h>
68 #include <unistd.h>
69 #include <sys/poll.h>
70 #include <curl/curl.h>
71 #include <ev.h>
72 #include <fcntl.h>
73 #include <sys/stat.h>
74 #include <errno.h>
75 
76 #define DPRINT(x...) printf(x)
77 
78 #define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
79 
80 
81 /* Global information, common to all connections */
82 typedef struct _GlobalInfo
83 {
84  struct ev_loop *loop;
85  struct ev_io fifo_event;
86  struct ev_timer timer_event;
87  CURLM *multi;
88  int still_running;
89  FILE *input;
90 } GlobalInfo;
91 
92 
93 /* Information associated with a specific easy handle */
94 typedef struct _ConnInfo
95 {
96  CURL *easy;
97  char *url;
98  GlobalInfo *global;
99  char error[CURL_ERROR_SIZE];
100 } ConnInfo;
101 
102 
103 /* Information associated with a specific socket */
104 typedef struct _SockInfo
105 {
108  int action;
109  long timeout;
110  struct ev_io ev;
111  int evset;
113 } SockInfo;
114 
115 static void timer_cb(EV_P_ struct ev_timer *w, int revents);
116 
117 /* Update the event timer after curl_multi library calls */
118 static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
119 {
120  DPRINT("%s %li\n", __PRETTY_FUNCTION__, timeout_ms);
121  ev_timer_stop(g->loop, &g->timer_event);
122  if(timeout_ms > 0) {
123  double t = timeout_ms / 1000;
124  ev_timer_init(&g->timer_event, timer_cb, t, 0.);
125  ev_timer_start(g->loop, &g->timer_event);
126  }
127  else if(timeout_ms == 0)
128  timer_cb(g->loop, &g->timer_event, 0);
129  return 0;
130 }
131 
132 /* Die if we get a bad CURLMcode somewhere */
133 static void mcode_or_die(const char *where, CURLMcode code)
134 {
135  if(CURLM_OK != code) {
136  const char *s;
137  switch(code) {
138  case CURLM_BAD_HANDLE:
139  s = "CURLM_BAD_HANDLE";
140  break;
142  s = "CURLM_BAD_EASY_HANDLE";
143  break;
144  case CURLM_OUT_OF_MEMORY:
145  s = "CURLM_OUT_OF_MEMORY";
146  break;
148  s = "CURLM_INTERNAL_ERROR";
149  break;
151  s = "CURLM_UNKNOWN_OPTION";
152  break;
153  case CURLM_LAST:
154  s = "CURLM_LAST";
155  break;
156  default:
157  s = "CURLM_unknown";
158  break;
159  case CURLM_BAD_SOCKET:
160  s = "CURLM_BAD_SOCKET";
161  fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
162  /* ignore this error */
163  return;
164  }
165  fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
166  exit(code);
167  }
168 }
169 
170 
171 
172 /* Check for completed transfers, and remove their easy handles */
174 {
175  char *eff_url;
176  CURLMsg *msg;
177  int msgs_left;
178  ConnInfo *conn;
179  CURL *easy;
180  CURLcode res;
181 
182  fprintf(MSG_OUT, "REMAINING: %d\n", g->still_running);
183  while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
184  if(msg->msg == CURLMSG_DONE) {
185  easy = msg->easy_handle;
186  res = msg->data.result;
187  curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
188  curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
189  fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
191  free(conn->url);
192  curl_easy_cleanup(easy);
193  free(conn);
194  }
195  }
196 }
197 
198 
199 
200 /* Called by libevent when we get action on a multi socket */
201 static void event_cb(EV_P_ struct ev_io *w, int revents)
202 {
203  DPRINT("%s w %p revents %i\n", __PRETTY_FUNCTION__, w, revents);
204  GlobalInfo *g = (GlobalInfo*) w->data;
205  CURLMcode rc;
206 
207  int action = (revents&EV_READ?CURL_POLL_IN:0)|
208  (revents&EV_WRITE?CURL_POLL_OUT:0);
210  mcode_or_die("event_cb: curl_multi_socket_action", rc);
211  check_multi_info(g);
212  if(g->still_running <= 0) {
213  fprintf(MSG_OUT, "last transfer done, kill timeout\n");
214  ev_timer_stop(g->loop, &g->timer_event);
215  }
216 }
217 
218 /* Called by libevent when our timeout expires */
219 static void timer_cb(EV_P_ struct ev_timer *w, int revents)
220 {
221  DPRINT("%s w %p revents %i\n", __PRETTY_FUNCTION__, w, revents);
222 
223  GlobalInfo *g = (GlobalInfo *)w->data;
224  CURLMcode rc;
225 
227  &g->still_running);
228  mcode_or_die("timer_cb: curl_multi_socket_action", rc);
229  check_multi_info(g);
230 }
231 
232 /* Clean up the SockInfo structure */
233 static void remsock(SockInfo *f, GlobalInfo *g)
234 {
235  printf("%s \n", __PRETTY_FUNCTION__);
236  if(f) {
237  if(f->evset)
238  ev_io_stop(g->loop, &f->ev);
239  free(f);
240  }
241 }
242 
243 
244 
245 /* Assign information to a SockInfo structure */
246 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
247  GlobalInfo *g)
248 {
249  printf("%s \n", __PRETTY_FUNCTION__);
250 
251  int kind = (act&CURL_POLL_IN?EV_READ:0)|(act&CURL_POLL_OUT?EV_WRITE:0);
252 
253  f->sockfd = s;
254  f->action = act;
255  f->easy = e;
256  if(f->evset)
257  ev_io_stop(g->loop, &f->ev);
258  ev_io_init(&f->ev, event_cb, f->sockfd, kind);
259  f->ev.data = g;
260  f->evset = 1;
261  ev_io_start(g->loop, &f->ev);
262 }
263 
264 
265 
266 /* Initialize a new SockInfo structure */
267 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
268 {
269  SockInfo *fdp = calloc(sizeof(SockInfo), 1);
270 
271  fdp->global = g;
272  setsock(fdp, s, easy, action, g);
273  curl_multi_assign(g->multi, s, fdp);
274 }
275 
276 /* CURLMOPT_SOCKETFUNCTION */
277 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
278 {
279  DPRINT("%s e %p s %i what %i cbp %p sockp %p\n",
280  __PRETTY_FUNCTION__, e, s, what, cbp, sockp);
281 
282  GlobalInfo *g = (GlobalInfo*) cbp;
283  SockInfo *fdp = (SockInfo*) sockp;
284  const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE"};
285 
287  "socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
288  if(what == CURL_POLL_REMOVE) {
289  fprintf(MSG_OUT, "\n");
290  remsock(fdp, g);
291  }
292  else {
293  if(!fdp) {
294  fprintf(MSG_OUT, "Adding data: %s\n", whatstr[what]);
295  addsock(s, e, what, g);
296  }
297  else {
299  "Changing action from %s to %s\n",
300  whatstr[fdp->action], whatstr[what]);
301  setsock(fdp, s, e, what, g);
302  }
303  }
304  return 0;
305 }
306 
307 
308 /* CURLOPT_WRITEFUNCTION */
309 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
310 {
311  size_t realsize = size * nmemb;
312  ConnInfo *conn = (ConnInfo*) data;
313  (void)ptr;
314  (void)conn;
315  return realsize;
316 }
317 
318 
319 /* CURLOPT_PROGRESSFUNCTION */
320 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
321  double uln)
322 {
323  ConnInfo *conn = (ConnInfo *)p;
324  (void)ult;
325  (void)uln;
326 
327  fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
328  return 0;
329 }
330 
331 
332 /* Create a new easy handle, and add it to the global curl_multi */
333 static void new_conn(char *url, GlobalInfo *g)
334 {
335  ConnInfo *conn;
336  CURLMcode rc;
337 
338  conn = calloc(1, sizeof(ConnInfo));
339  memset(conn, 0, sizeof(ConnInfo));
340  conn->error[0]='\0';
341 
342  conn->easy = curl_easy_init();
343  if(!conn->easy) {
344  fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
345  exit(2);
346  }
347  conn->global = g;
348  conn->url = strdup(url);
349  curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
350  curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
351  curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
352  curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, 1L);
353  curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
354  curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
355  curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 0L);
356  curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
357  curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
358  curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
359  curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 10L);
360 
362  "Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
363  rc = curl_multi_add_handle(g->multi, conn->easy);
364  mcode_or_die("new_conn: curl_multi_add_handle", rc);
365 
366  /* note that the add_handle() will set a time-out to trigger very soon so
367  that the necessary socket_action() call will be called by this app */
368 }
369 
370 /* This gets called whenever data is received from the fifo */
371 static void fifo_cb(EV_P_ struct ev_io *w, int revents)
372 {
373  char s[1024];
374  long int rv = 0;
375  int n = 0;
376  GlobalInfo *g = (GlobalInfo *)w->data;
377 
378  do {
379  s[0]='\0';
380  rv = fscanf(g->input, "%1023s%n", s, &n);
381  s[n]='\0';
382  if(n && s[0]) {
383  new_conn(s, g); /* if we read a URL, go get it! */
384  }
385  else
386  break;
387  } while(rv != EOF);
388 }
389 
390 /* Create a named pipe and tell libevent to monitor it */
391 static int init_fifo(GlobalInfo *g)
392 {
393  struct stat st;
394  static const char *fifo = "hiper.fifo";
395  curl_socket_t sockfd;
396 
397  fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
398  if(lstat (fifo, &st) == 0) {
399  if((st.st_mode & S_IFMT) == S_IFREG) {
400  errno = EEXIST;
401  perror("lstat");
402  exit(1);
403  }
404  }
405  unlink(fifo);
406  if(mkfifo (fifo, 0600) == -1) {
407  perror("mkfifo");
408  exit(1);
409  }
410  sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
411  if(sockfd == -1) {
412  perror("open");
413  exit(1);
414  }
415  g->input = fdopen(sockfd, "r");
416 
417  fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
418  ev_io_init(&g->fifo_event, fifo_cb, sockfd, EV_READ);
419  ev_io_start(g->loop, &g->fifo_event);
420  return (0);
421 }
422 
423 int main(int argc, char **argv)
424 {
425  GlobalInfo g;
426  CURLMcode rc;
427  (void)argc;
428  (void)argv;
429 
430  memset(&g, 0, sizeof(GlobalInfo));
431  g.loop = ev_default_loop(0);
432 
433  init_fifo(&g);
434  g.multi = curl_multi_init();
435 
436  ev_timer_init(&g.timer_event, timer_cb, 0., 0.);
437  g.timer_event.data = &g;
438  g.fifo_event.data = &g;
439  curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
440  curl_multi_setopt(g.multi, CURLMOPT_SOCKETDATA, &g);
441  curl_multi_setopt(g.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
442  curl_multi_setopt(g.multi, CURLMOPT_TIMERDATA, &g);
443 
444  /* we don't call any curl_multi_socket*() function yet as we have no handles
445  added! */
446 
447  ev_loop(g.loop, 0);
449  return 0;
450 }
#define free(ptr)
Definition: curl_memory.h:130
struct _ConnInfo ConnInfo
static void new_conn(char *url, GlobalInfo *g)
Definition: evhiperfifo.c:333
curl_socket_t sockfd
Definition: evhiperfifo.c:106
CURLM * multi
Definition: asiohiper.cpp:65
char * url
Definition: asiohiper.cpp:73
int main(int argc, char **argv)
Definition: evhiperfifo.c:423
FILE * input
Definition: evhiperfifo.c:89
struct ev_timer timer_event
Definition: evhiperfifo.c:86
CURL * easy
Definition: evhiperfifo.c:107
static void fifo_cb(EV_P_ struct ev_io *w, int revents)
Definition: evhiperfifo.c:371
GlobalInfo * global
Definition: evhiperfifo.c:112
CURL_EXTERN CURLMcode curl_multi_add_handle(CURLM *multi_handle, CURL *curl_handle)
static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act, GlobalInfo *g)
Definition: evhiperfifo.c:246
static void event_cb(EV_P_ struct ev_io *w, int revents)
Definition: evhiperfifo.c:201
long timeout
Definition: evhiperfifo.c:109
int stat(const char *path, struct stat *buffer)
static void timer_cb(EV_P_ struct ev_timer *w, int revents)
Definition: evhiperfifo.c:219
static void check_multi_info(GlobalInfo *g)
Definition: evhiperfifo.c:173
CURL_EXTERN CURLMcode curl_multi_assign(CURLM *multi_handle, curl_socket_t sockfd, void *sockp)
#define strdup(ptr)
Definition: curl_memory.h:122
CURL * easy_handle
Definition: multi.h:95
XmlRpcServer s
UNITTEST_START char * ptr
Definition: unit1330.c:38
CURLcode
Definition: curl.h:454
CURL * easy
Definition: asiohiper.cpp:72
Definition: multi.h:93
struct ev_io ev
Definition: evhiperfifo.c:110
static int res
static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
Definition: evhiperfifo.c:118
static int prog_cb(void *p, double dltotal, double dlnow, double ult, double uln)
Definition: evhiperfifo.c:320
static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
Definition: evhiperfifo.c:267
#define CURL_POLL_OUT
Definition: multi.h:258
geometry_msgs::TransformStamped t
#define curl_easy_setopt(handle, option, value)
Definition: typecheck-gcc.h:41
const char ** p
Definition: unit1394.c:76
CURL_EXTERN CURLM * curl_multi_init(void)
Definition: multi.c:355
static void remsock(SockInfo *f, GlobalInfo *g)
Definition: evhiperfifo.c:233
#define curl_multi_setopt(handle, opt, param)
#define curl_easy_getinfo(handle, info, arg)
UNITTEST_START int rc
Definition: unit1301.c:31
#define CURL_POLL_REMOVE
Definition: multi.h:260
#define printf
Definition: curl_printf.h:40
#define CURL_POLL_IN
Definition: multi.h:257
CURL_EXTERN CURL * curl_easy_init(void)
Definition: easy.c:343
CURLMSG msg
Definition: multi.h:94
CURL_EXTERN void curl_easy_cleanup(CURL *curl)
struct ev_loop * loop
Definition: evhiperfifo.c:84
static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
Definition: evhiperfifo.c:277
action
#define MSG_OUT
Definition: evhiperfifo.c:78
CURL_EXTERN CURLMcode curl_multi_remove_handle(CURLM *multi_handle, CURL *curl_handle)
CURLMcode
Definition: multi.h:61
#define CURL_SOCKET_TIMEOUT
Definition: multi.h:262
int still_running
Definition: asiohiper.cpp:66
char error[CURL_ERROR_SIZE]
Definition: asiohiper.cpp:75
void CURLM
Definition: multi.h:58
struct ev_io fifo_event
Definition: evhiperfifo.c:85
GlobalInfo * global
Definition: asiohiper.cpp:74
static int init_fifo(GlobalInfo *g)
Definition: evhiperfifo.c:391
void CURL
Definition: curl.h:102
static const char * fifo
Definition: hiperfifo.c:381
CURL_EXTERN CURLMsg * curl_multi_info_read(CURLM *multi_handle, int *msgs_in_queue)
#define DPRINT(x...)
Definition: evhiperfifo.c:76
static void mcode_or_die(const char *where, CURLMcode code)
Definition: evhiperfifo.c:133
struct _SockInfo SockInfo
Definition: multi.h:64
union CURLMsg::@6 data
#define CURL_ERROR_SIZE
Definition: curl.h:724
size_t size
Definition: unit1302.c:52
#define fprintf
Definition: curl_printf.h:41
static CURL * easy[MAX_EASY_HANDLES]
struct _GlobalInfo GlobalInfo
CURLcode result
Definition: multi.h:98
int curl_socket_t
Definition: curl.h:130
CURL_EXTERN CURLMcode curl_multi_socket_action(CURLM *multi_handle, curl_socket_t s, int ev_bitmask, int *running_handles)
Definition: debug.c:29
CURL_EXTERN CURLMcode curl_multi_cleanup(CURLM *multi_handle)
static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
Definition: evhiperfifo.c:309
#define calloc(nbelem, size)
Definition: curl_memory.h:126


rc_tagdetect_client
Author(s): Monika Florek-Jasinska , Raphael Schaller
autogenerated on Sat Feb 13 2021 03:42:09