webrtc.cpp
Go to the documentation of this file.
1 /*
2  * Includes code from https://github.com/centricular/gstwebrtc-demos/blob/master/sendrecv/gst/webrtc-sendrecv.c
3  * up-to-date to commit e4b86bc4f151e35222aff1bf7e46cec016e7b0ee 2020-6-25
4  */
5 
6 #include <gst/gst.h>
7 #include <gst/sdp/sdp.h>
8 #include <gst/app/app.h>
9 
10 #define GST_USE_UNSTABLE_API
11 #include <gst/webrtc/webrtc.h>
12 
13 /* For signalling */
14 #include <libsoup/soup.h>
15 #include <json-glib/json-glib.h>
16 
17 #include <string.h>
18 
19 #include <ros/ros.h>
22 #include <thread>
23 #include <std_msgs/String.h>
24 
25 enum AppState {
27  APP_STATE_ERROR = 1, /* generic error */
30  SERVER_CONNECTED, /* Ready to register */
33  SERVER_REGISTERED, /* Ready to call a peer */
34  SERVER_CLOSED, /* server connection closed by us or the server */
43 };
44 
46 
47 
48 static GMainLoop *loop;
49 static GstElement *pipe1, *webrtc1, *appsrc_;
50 static GObject *send_channel, *receive_channel;
51 
52 static SoupWebsocketConnection *ws_conn = nullptr;
54 static const gchar *peer_id = nullptr;
55 static const gchar *server_url = "wss://127.0.0.1:8443";
56 static gboolean disable_ssl = FALSE;
57 
58 // added
59 std::unique_ptr<image_transport::ImageTransport> image_transport_;
61 static GCancellable *serverConnection;
62 std::thread loop_thread_;
63 static int retries = 0;
64 static const int MAXRETRIES = 5;
65 gboolean serverConnected = FALSE;
66 gboolean serverConnectStarted = FALSE;
67 static const gchar *audio_device = nullptr;
68 static const gchar *video_width = nullptr;
69 static const gchar *video_height = nullptr;
70 static const gchar *video_bitrate = nullptr;
71 static const gchar *turnserver_login = nullptr;
72 static const gchar *turnserver_port = nullptr;
74 
75 static GOptionEntry entries[] =
76 {
77  { "peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id, "String ID of the peer to connect to", "ID" },
78  { "server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL" },
79  { "disable-ssl", 0, 0, G_OPTION_ARG_NONE, &disable_ssl, "Disable ssl", NULL },
80  { "audio-device", 0, 0, G_OPTION_ARG_STRING, &audio_device, "String of the audio device number", "ID" },
81  { "video-width", 0, 0, G_OPTION_ARG_STRING, &video_width, "video width pixels", NULL },
82  { "video-height", 0, 0, G_OPTION_ARG_STRING, &video_height, "video height pixels", NULL },
83  { "video-bitrate", 0, 0, G_OPTION_ARG_STRING, &video_bitrate, "target bitrate kbps", NULL },
84  { "turnserver-port", 0, 0, G_OPTION_ARG_STRING, &turnserver_port, "TURN server port number", NULL },
85  { "turnserver-login", 0, 0, G_OPTION_ARG_STRING, &turnserver_login, "TURN server user:pass", NULL },
86  { NULL },
87 };
88 
89 static gboolean
90 cleanup_and_quit_loop (const gchar * msg, AppState state)
91 {
92  if (msg)
93  g_printerr ("%s\n", msg);
94  if (state > 0)
95  app_state = state;
96 
97  if (ws_conn) {
98  if (soup_websocket_connection_get_state (ws_conn) ==
99  SOUP_WEBSOCKET_STATE_OPEN)
100  /* This will call us again */
101  soup_websocket_connection_close (ws_conn, 1000, "");
102  else
103  g_object_unref (ws_conn);
104  }
105 
106  if (loop) {
107  g_main_loop_quit (loop);
108  //g_main_loop_unref(loop); // added - required?
109  loop = NULL;
110  g_print("g_main_loop quit\n");
111  }
112 
113  /* To allow usage as a GSourceFunc */
114  return G_SOURCE_REMOVE;
115 }
116 
117 static gchar*
118 get_string_from_json_object (JsonObject * object)
119 {
120  JsonNode *root;
121  JsonGenerator *generator;
122  gchar *text;
123 
124  /* Make it the root node */
125  root = json_node_init_object (json_node_alloc (), object);
126  generator = json_generator_new ();
127  json_generator_set_root (generator, root);
128  text = json_generator_to_data (generator, NULL);
129 
130  /* Release everything */
131  g_object_unref (generator);
132  json_node_free (root);
133  return text;
134 }
135 
136 static void
137 handle_media_stream (GstPad * pad, GstElement * pipe, const char * convert_name,
138  const char * sink_name)
139 {
140  GstPad *qpad;
141  GstElement *q, *conv, *resample, *sink;
142  GstPadLinkReturn ret;
143 
144  g_print ("Trying to handle stream with %s ! %s", convert_name, sink_name);
145 
146  q = gst_element_factory_make ("queue", NULL);
147  g_assert_nonnull (q);
148  conv = gst_element_factory_make (convert_name, NULL);
149  g_assert_nonnull (conv);
150  sink = gst_element_factory_make (sink_name, NULL);
151  g_assert_nonnull (sink);
152 
153  if (g_strcmp0 (convert_name, "audioconvert") == 0) {
154  /* Might also need to resample, so add it just in case.
155  * Will be a no-op if it's not required. */
156  resample = gst_element_factory_make ("audioresample", NULL);
157  g_assert_nonnull (resample);
158  gst_bin_add_many (GST_BIN (pipe), q, conv, resample, sink, NULL);
159  gst_element_sync_state_with_parent (q);
160  gst_element_sync_state_with_parent (conv);
161  gst_element_sync_state_with_parent (resample);
162  gst_element_sync_state_with_parent (sink);
163  gst_element_link_many (q, conv, resample, sink, NULL);
164  } else {
165  gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL);
166  gst_element_sync_state_with_parent (q);
167  gst_element_sync_state_with_parent (conv);
168  gst_element_sync_state_with_parent (sink);
169  gst_element_link_many (q, conv, sink, NULL);
170  }
171 
172  qpad = gst_element_get_static_pad (q, "sink");
173 
174  ret = gst_pad_link (pad, qpad);
175  g_assert_cmphex (ret, ==, GST_PAD_LINK_OK);
176 }
177 
178 static void
179 on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad,
180  GstElement * pipe)
181 {
182  GstCaps *caps;
183  const gchar *name;
184 
185  if (!gst_pad_has_current_caps (pad)) {
186  g_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n",
187  GST_PAD_NAME (pad));
188  return;
189  }
190 
191  caps = gst_pad_get_current_caps (pad);
192  name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
193 
194  if (g_str_has_prefix (name, "video")) {
195  handle_media_stream (pad, pipe, "videoconvert", "autovideosink");
196  } else if (g_str_has_prefix (name, "audio")) {
197  handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink");
198  } else {
199  g_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad));
200  }
201 }
202 
203 static void
204 on_incoming_stream (GstElement * webrtc, GstPad * pad, GstElement * pipe)
205 {
206  GstElement *decodebin;
207  GstPad *sinkpad;
208 
209  if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
210  return;
211 
212  decodebin = gst_element_factory_make ("decodebin", NULL);
213  g_signal_connect (decodebin, "pad-added",
214  G_CALLBACK (on_incoming_decodebin_stream), pipe);
215  gst_bin_add (GST_BIN (pipe), decodebin);
216  gst_element_sync_state_with_parent (decodebin);
217 
218  sinkpad = gst_element_get_static_pad (decodebin, "sink");
219  gst_pad_link (pad, sinkpad);
220  gst_object_unref (sinkpad);
221 }
222 
223 static void
224 send_ice_candidate_message (GstElement * webrtc G_GNUC_UNUSED, guint mlineindex,
225  gchar * candidate, gpointer user_data G_GNUC_UNUSED)
226 {
227  gchar *text;
228  JsonObject *ice, *msg;
229 
231  cleanup_and_quit_loop ("Can't send ICE, not in call", APP_STATE_ERROR);
232  return;
233  }
234 
235  ice = json_object_new ();
236  json_object_set_string_member (ice, "candidate", candidate);
237  json_object_set_int_member (ice, "sdpMLineIndex", mlineindex);
238  msg = json_object_new ();
239  json_object_set_object_member (msg, "ice", ice);
240  text = get_string_from_json_object (msg);
241  json_object_unref (msg);
242 
243  soup_websocket_connection_send_text (ws_conn, text);
244  g_free (text);
245 }
246 
247 static void
248 send_sdp_to_peer (GstWebRTCSessionDescription * desc)
249 {
250  gchar *text;
251  JsonObject *msg, *sdp;
252 
254  cleanup_and_quit_loop ("Can't send SDP to peer, not in call",
256  return;
257  }
258 
259  text = gst_sdp_message_as_text (desc->sdp);
260  sdp = json_object_new ();
261 
262  if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER) {
263  g_print ("Sending offer:\n%s\n", text);
264  json_object_set_string_member (sdp, "type", "offer");
265  } else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER) {
266  g_print ("Sending answer:\n%s\n", text);
267  json_object_set_string_member (sdp, "type", "answer");
268  } else {
269  g_assert_not_reached ();
270  }
271 
272  json_object_set_string_member (sdp, "sdp", text);
273  g_free (text);
274 
275  msg = json_object_new ();
276  json_object_set_object_member (msg, "sdp", sdp);
277  text = get_string_from_json_object (msg);
278  json_object_unref (msg);
279 
280  soup_websocket_connection_send_text (ws_conn, text);
281  g_free (text);
282 }
283 
284 /* Offer created by our pipeline, to be sent to the peer */
285 static void
286 on_offer_created (GstPromise * promise, gpointer user_data)
287 {
288  GstWebRTCSessionDescription *offer = NULL;
289  const GstStructure *reply;
290 
291  g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
292 
293  g_assert_cmphex (gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED);
294  reply = gst_promise_get_reply (promise);
295  gst_structure_get (reply, "offer",
296  GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
297  gst_promise_unref (promise);
298 
299  promise = gst_promise_new ();
300  g_signal_emit_by_name (webrtc1, "set-local-description", offer, promise);
301  gst_promise_interrupt (promise);
302  gst_promise_unref (promise);
303 
304  /* Send offer to peer */
305  send_sdp_to_peer (offer);
306  gst_webrtc_session_description_free (offer);
307 }
308 
309 static void
310 on_negotiation_needed (GstElement * element, gpointer user_data)
311 {
312  GstPromise *promise;
313 
315  promise = gst_promise_new_with_change_func (on_offer_created, user_data, NULL);
316  g_signal_emit_by_name (webrtc1, "create-offer", NULL, promise);
317 }
318 
319 static void
320 on_ice_gathering_state_notify (GstElement * webrtcbin, GParamSpec * pspec,
321  gpointer user_data)
322 {
323  GstWebRTCICEGatheringState ice_gather_state;
324  const gchar *new_state = "unknown";
325 
326  g_object_get (webrtcbin, "ice-gathering-state", &ice_gather_state, NULL);
327  switch (ice_gather_state) {
328  case GST_WEBRTC_ICE_GATHERING_STATE_NEW:
329  new_state = "new";
330  break;
331  case GST_WEBRTC_ICE_GATHERING_STATE_GATHERING:
332  new_state = "gathering";
333  break;
334  case GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE:
335  new_state = "complete";
336  break;
337  }
338  g_print ("ICE gathering state changed to %s\n", new_state);
339 }
340 
341 
342 #define STUN_SERVER " stun-server=stun://stun.l.google.com:19302 "
343 #define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload="
344 #define RTP_CAPS_VP8 "application/x-rtp,media=video,encoding-name=VP8,payload="
345 
346 static gboolean start_pipeline (void)
347 {
348  GstStateChangeReturn ret;
349  GError *error = NULL;
350 
351  if(!audio_device) {
352  gchar *pl = g_strconcat ("webrtcbin bundle-policy=max-bundle name=sendrecv " STUN_SERVER
353  "turn-server=turn://", turnserver_login, "@127.0.0.1:", turnserver_port, " "
354  "videorate ! video/x-raw,width=", video_width, ",height=", video_height, ",framerate=15/1 ! "
355  "videoconvert ! queue ! vp8enc deadline=1 target-bitrate=", video_bitrate, "000 ! rtpvp8pay ! "
356  "queue ! " RTP_CAPS_VP8 "96 ! sendrecv. ", NULL);
357 
358  pipe1 = gst_parse_launch (pl, &error);
359  g_free(pl);
360  }
361  else {
362  gchar *pl = g_strconcat ("webrtcbin bundle-policy=max-bundle name=sendrecv " STUN_SERVER
363  "turn-server=turn://", turnserver_login, "@127.0.0.1:", turnserver_port, " "
364  "videorate ! video/x-raw,width=", video_width, ",height=", video_height, ",framerate=15/1 ! "
365  "videoconvert ! queue ! vp8enc deadline=1 target-bitrate=", video_bitrate, "000 ! rtpvp8pay ! "
366  "queue ! " RTP_CAPS_VP8 "96 ! sendrecv. "
367  "alsasrc device=hw:", audio_device, " ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! "
368  "queue ! " RTP_CAPS_OPUS "97 ! sendrecv. ", NULL);
369 
370  pipe1 = gst_parse_launch (pl, &error);
371  g_free(pl);
372  }
373 
374  if (error) {
375  g_printerr ("Failed to parse launch: %s\n", error->message);
376  g_error_free (error);
377  if (pipe1) g_clear_object (&pipe1);
378  return FALSE;
379  }
380 
381 
382  // init appsrc
383  appsrc_ = gst_element_factory_make("appsrc", "source");
384  if (appsrc_ == nullptr) {
385  ROS_ERROR("GST: failed to create appsrc!");
386  return FALSE;
387  }
388 
389  gst_app_src_set_stream_type(GST_APP_SRC_CAST(appsrc_), GST_APP_STREAM_TYPE_STREAM);
390  gst_app_src_set_latency(GST_APP_SRC_CAST(appsrc_), 0, -1);
391  g_object_set(GST_OBJECT(appsrc_),
392  "format", GST_FORMAT_TIME,
393  "is-live", true,
394  "max-bytes", 0,
395  "do-timestamp", true,
396  NULL);
397 
398  // find pipeline sink (where we may link appsrc)
399  GstPad *inpad = gst_bin_find_unlinked_pad(GST_BIN(pipe1), GST_PAD_SINK);
400  g_assert(inpad);
401 
402  GstElement *inelement = gst_pad_get_parent_element(inpad);
403  g_assert(inelement);
404  gst_object_unref(GST_OBJECT(inpad));
405  ROS_INFO("GST: inelement: %s", gst_element_get_name(inelement));
406 
407  if (!gst_bin_add(GST_BIN(pipe1), appsrc_)) {
408  ROS_ERROR("GST: gst_bin_add() failed!");
409  gst_object_unref(GST_OBJECT(pipe1));
410  gst_object_unref(GST_OBJECT(inelement));
411  return FALSE;
412  }
413 
414  if (!gst_element_link(appsrc_, inelement)) {
415  ROS_ERROR("GST: cannot link %s -> %s",
416  gst_element_get_name(appsrc_),
417  gst_element_get_name(inelement));
418  gst_object_unref(GST_OBJECT(pipe1));
419  gst_object_unref(GST_OBJECT(inelement));
420  return FALSE;
421  }
422 
423  gst_object_unref(GST_OBJECT(inelement));
424 
425 
426 
427 
428  webrtc1 = gst_bin_get_by_name (GST_BIN (pipe1), "sendrecv");
429  g_assert_nonnull (webrtc1);
430 
431  /* This is the gstwebrtc entry point where we create the offer and so on. It
432  * will be called when the pipeline goes to PLAYING. */
433  g_signal_connect (webrtc1, "on-negotiation-needed",
434  G_CALLBACK (on_negotiation_needed), NULL);
435  /* We need to transmit this ICE candidate to the browser via the websockets
436  * signalling server. Incoming ice candidates from the browser need to be
437  * added by us too, see on_server_message() */
438  g_signal_connect (webrtc1, "on-ice-candidate",
439  G_CALLBACK (send_ice_candidate_message), NULL);
440  g_signal_connect (webrtc1, "notify::ice-gathering-state",
441  G_CALLBACK (on_ice_gathering_state_notify), NULL);
442 
443  gst_element_set_state (pipe1, GST_STATE_READY);
444 
445  /* Incoming streams will be exposed via this signal */
446  g_signal_connect (webrtc1, "pad-added", G_CALLBACK (on_incoming_stream),
447  pipe1);
448  /* Lifetime is the same as the pipeline itself */
449  gst_object_unref (webrtc1);
450 
451  g_print ("Starting pipeline\n");
452  ret = gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_PLAYING);
453  if (ret == GST_STATE_CHANGE_FAILURE)
454  goto err;
455 
456  return TRUE;
457 
458 err:
459  g_print ("ERROR starting pipeline\n");
460  if (pipe1)
461  g_clear_object (&pipe1);
462  if (webrtc1)
463  webrtc1 = NULL;
464  return FALSE;
465 }
466 
467 static gboolean
469 {
470  gchar *msg;
471 
472  if (soup_websocket_connection_get_state (ws_conn) !=
473  SOUP_WEBSOCKET_STATE_OPEN)
474  return FALSE;
475 
476  if (!peer_id)
477  return FALSE;
478 
479  g_print ("Setting up signalling server call with %s\n", peer_id);
481  msg = g_strdup_printf ("SESSION %s", peer_id);
482  soup_websocket_connection_send_text (ws_conn, msg);
483  g_free (msg);
484  return TRUE;
485 }
486 
487 static gboolean
489 {
490  gchar *hello;
491  gint32 our_id;
492 
493  if (soup_websocket_connection_get_state (ws_conn) !=
494  SOUP_WEBSOCKET_STATE_OPEN)
495  return FALSE;
496 
497  our_id = g_random_int_range (10, 10000);
498  g_print ("Registering id %i with server\n", our_id);
500 
501  /* Register with the server with a random integer id. Reply will be received
502  * by on_server_message() */
503  hello = g_strdup_printf ("HELLO %i", our_id);
504  soup_websocket_connection_send_text (ws_conn, hello);
505  g_free (hello);
506 
507  return TRUE;
508 }
509 
510 static void
511 on_server_closed (SoupWebsocketConnection * conn G_GNUC_UNUSED,
512  gpointer user_data G_GNUC_UNUSED)
513 {
515  // cleanup_and_quit_loop ("Server connection closed", APP_STATE_UNKNOWN);
516  g_print ("Server connection closed, trying to continue anyway\n");
517 
518  std_msgs::String msg;
519  msg.data = "disconnected";
520  webrtcstatus_pub.publish(msg);
521 }
522 
523 /* One mega message handler for our asynchronous calling mechanism */
524 static void
525 on_server_message (SoupWebsocketConnection * conn, SoupWebsocketDataType type,
526  GBytes * message, gpointer user_data)
527 {
528  gchar *text;
529 
530  switch (type) {
531  case SOUP_WEBSOCKET_DATA_BINARY:
532  g_printerr ("Received unknown binary message, ignoring\n");
533  return;
534  case SOUP_WEBSOCKET_DATA_TEXT: {
535  gsize size;
536  const gchar *data = reinterpret_cast<const gchar*>(g_bytes_get_data (message, &size));
537  /* Convert to NULL-terminated string */
538  text = g_strndup (data, size);
539  break;
540  }
541  default:
542  g_assert_not_reached ();
543  }
544 
545  /* Server has accepted our registration, we are ready to send commands */
546  if (g_strcmp0 (text, "HELLO") == 0) {
547  if (app_state != SERVER_REGISTERING) {
548  cleanup_and_quit_loop ("ERROR: Received HELLO when not registering",
550  goto out;
551  }
553  g_print ("Registered with server\n");
554  /* Ask signalling server to connect us with a specific peer */
555  if (!setup_call ()) {
556  cleanup_and_quit_loop ("ERROR: Failed to setup call", PEER_CALL_ERROR);
557  goto out;
558  }
559  /* Call has been setup by the server, now we can start negotiation */
560  } else if (g_strcmp0 (text, "SESSION_OK") == 0) {
561  if (app_state != PEER_CONNECTING) {
562  cleanup_and_quit_loop ("ERROR: Received SESSION_OK when not calling",
564  goto out;
565  }
566 
568  /* Start negotiation (exchange SDP and ICE candidates) */
569  if (!start_pipeline ())
570  cleanup_and_quit_loop ("ERROR: failed to start pipeline",
572  /* Handle errors */
573  } else if (g_str_has_prefix (text, "ERROR")) {
574  switch (app_state) {
575  case SERVER_CONNECTING:
577  break;
578  case SERVER_REGISTERING:
580  break;
581  case PEER_CONNECTING:
583  break;
584  case PEER_CONNECTED:
587  break;
588  default:
590  }
592  /* Look for JSON messages containing SDP and ICE candidates */
593  } else {
594  JsonNode *root;
595  JsonObject *object, *child;
596  JsonParser *parser = json_parser_new ();
597  if (!json_parser_load_from_data (parser, text, -1, NULL)) {
598  g_printerr ("Unknown message '%s', ignoring", text);
599  g_object_unref (parser);
600  goto out;
601  }
602 
603  root = json_parser_get_root (parser);
604  if (!JSON_NODE_HOLDS_OBJECT (root)) {
605  g_printerr ("Unknown json message '%s', ignoring", text);
606  g_object_unref (parser);
607  goto out;
608  }
609 
610  object = json_node_get_object (root);
611  /* Check type of JSON message */
612  if (json_object_has_member (object, "sdp")) {
613  int ret;
614  GstSDPMessage *sdp;
615  const gchar *text, *sdptype;
616  GstWebRTCSessionDescription *answer;
617 
618  g_assert_cmphex (app_state, ==, PEER_CALL_NEGOTIATING);
619 
620  child = json_object_get_object_member (object, "sdp");
621 
622  if (!json_object_has_member (child, "type")) {
623  cleanup_and_quit_loop ("ERROR: received SDP without 'type'",
625  goto out;
626  }
627 
628  sdptype = json_object_get_string_member (child, "type");
629  /* In this example, we create the offer and receive one answer by default,
630  * but it's possible to comment out the offer creation and wait for an offer
631  * instead, so we handle either here.
632  *
633  * See tests/examples/webrtcbidirectional.c in gst-plugins-bad for another
634  * example how to handle offers from peers and reply with answers using webrtcbin. */
635  text = json_object_get_string_member (child, "sdp");
636  ret = gst_sdp_message_new (&sdp);
637  g_assert_cmphex (ret, ==, GST_SDP_OK);
638  ret = gst_sdp_message_parse_buffer ((guint8 *) text, strlen (text), sdp);
639  g_assert_cmphex (ret, ==, GST_SDP_OK);
640 
641  if (g_str_equal (sdptype, "answer")) {
642  g_print ("Received answer:\n%s\n", text);
643  answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
644  sdp);
645  g_assert_nonnull (answer);
646 
647  /* Set remote description on our pipeline */
648  {
649  GstPromise *promise = gst_promise_new ();
650  g_signal_emit_by_name (webrtc1, "set-remote-description", answer,
651  promise);
652  gst_promise_interrupt (promise);
653  gst_promise_unref (promise);
654  }
656  }
657 
658  } else if (json_object_has_member (object, "ice")) {
659  const gchar *candidate;
660  gint sdpmlineindex;
661 
662  child = json_object_get_object_member (object, "ice");
663  candidate = json_object_get_string_member (child, "candidate");
664  sdpmlineindex = json_object_get_int_member (child, "sdpMLineIndex");
665 
666  /* Add ice candidate sent by remote peer */
667  g_signal_emit_by_name (webrtc1, "add-ice-candidate", sdpmlineindex,
668  candidate);
669  } else {
670  g_printerr ("Ignoring unknown JSON message:\n%s\n", text);
671  }
672  g_object_unref (parser);
673  }
674 
675 out:
676  g_free (text);
677 }
678 
679 static void
680 on_server_connected (SoupSession * session, GAsyncResult * res,
681  SoupMessage *msg)
682 {
683  serverConnected = TRUE;
684 
685  GError *error = NULL;
686 
687  ws_conn = soup_session_websocket_connect_finish (session, res, &error);
688  if (error) {
690  g_error_free (error);
691  return;
692  }
693 
694  g_assert_nonnull (ws_conn);
695 
697  g_print ("Connected to signalling server\n");
698 
699  g_signal_connect (ws_conn, "closed", G_CALLBACK (on_server_closed), NULL);
700  g_signal_connect (ws_conn, "message", G_CALLBACK (on_server_message), NULL);
701 
702  /* Register with the server so it knows about us and can accept commands */
704 
705  std_msgs::String rosmsg;
706  rosmsg.data = "connected";
707  webrtcstatus_pub.publish(rosmsg);
708 }
709 
710 /*
711  * Connect to the signalling server. This is the entrypoint for everything else.
712  */
713 static void
715 {
716  SoupLogger *logger;
717  SoupMessage *message;
718  SoupSession *session;
719  const char *https_aliases[] = {"wss", NULL};
720 
721  session = soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, !disable_ssl,
722  SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
723  //SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
724  SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
725 
726  // logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, -1);
727  // soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
728  // g_object_unref (logger);
729 
730  message = soup_message_new (SOUP_METHOD_GET, server_url);
731 
732  gchar *msg = g_strconcat ("Connecting to server: ", server_url,"\n", NULL);
733  g_print (msg);
734  g_free(msg);
735 
736  serverConnection = g_cancellable_new ();
737 
738  /* Once connected, we will register */
739  soup_session_websocket_connect_async (session, message, NULL, NULL, serverConnection,
740  (GAsyncReadyCallback) on_server_connected, message);
741  // ^^ non blocking, sometimes fails initial connect
742 
744 }
745 
746 static gboolean
748 {
749  int i;
750  gboolean ret;
751  GstPlugin *plugin;
752  GstRegistry *registry;
753  const gchar *needed[] = { "opus", "vpx", "nice", "webrtc", "dtls", "srtp",
754  "rtpmanager", "videotestsrc", "audiotestsrc", NULL};
755 
756  registry = gst_registry_get ();
757  ret = TRUE;
758  for (i = 0; i < g_strv_length ((gchar **) needed); i++) {
759  plugin = gst_registry_find_plugin (registry, needed[i]);
760  if (!plugin) {
761  g_print ("Required gstreamer plugin '%s' not found\n", needed[i]);
762  ret = FALSE;
763  continue;
764  }
765  gst_object_unref (plugin);
766  }
767  return ret;
768 }
769 
770 GstCaps* gst_caps_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
771 {
772  // http://gstreamer.freedesktop.org/data/doc/gstreamer/head/pwg/html/section-types-definitions.html
773  static const ros::M_string known_formats = {{
774  {enc::RGB8, "RGB"},
775  {enc::RGB16, "RGB16"},
776  {enc::RGBA8, "RGBA"},
777  {enc::RGBA16, "RGBA16"},
778  {enc::BGR8, "BGR"},
779  {enc::BGR16, "BGR16"},
780  {enc::BGRA8, "BGRA"},
781  {enc::BGRA16, "BGRA16"},
782  {enc::MONO8, "GRAY8"},
783  {enc::MONO16, "GRAY16_LE"},
784  }};
785 
786  if (msg->is_bigendian) {
787  ROS_ERROR("GST: big endian image format is not supported");
788  return nullptr;
789  }
790 
791  auto format = known_formats.find(msg->encoding);
792  if (format == known_formats.end()) {
793  ROS_ERROR("GST: image format '%s' unknown", msg->encoding.c_str());
794  return nullptr;
795  }
796 
797  return gst_caps_new_simple("video/x-raw",
798  "format", G_TYPE_STRING, format->second.c_str(),
799  "width", G_TYPE_INT, msg->width,
800  "height", G_TYPE_INT, msg->height,
801  "framerate", GST_TYPE_FRACTION, 0, 1, // 0/1 = dynamic
802  nullptr);
803 }
804 
805 GstSample* gst_sample_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
806 {
807  auto buffer = gst_buffer_new_allocate(nullptr, msg->data.size(), nullptr);
808  g_assert(buffer);
809 
810  gst_buffer_fill(buffer, 0, msg->data.data(), msg->data.size());
811  GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_LIVE);
812 
813  auto caps = gst_caps_new_from_image(msg);
814  if (caps == nullptr) {
815  gst_object_unref(GST_OBJECT(buffer));
816  return nullptr;
817  }
818 
819  auto sample = gst_sample_new(buffer, caps, nullptr, nullptr);
820  gst_buffer_unref(buffer);
821  gst_caps_unref(caps);
822 
823  return sample;
824 }
825 
826 
827 
828 
830  serverConnectStarted = TRUE;
831 
832  loop = g_main_loop_new (NULL, FALSE);
833 
835 
836  loop_thread_ = std::thread(
837  [&]() {
838  g_main_loop_run (loop);
839  g_main_loop_unref (loop);
840  g_print("loop_thread_ exit\n");
841  });
842  // loop_thread_.detach();
843 
844 }
845 
846 void image_cb(const sensor_msgs::Image::ConstPtr &msg)
847 {
848  // ROS_INFO("Image: %d x %d, stamp %f", msg->width, msg->height, msg->header.stamp.toSec());
849  if (!serverConnectStarted) {
851  return;
852  }
853 
854  if (!pipe1) return;
855 
856  auto sample = gst_sample_new_from_image(msg);
857  if (sample == nullptr)
858  return;
859 
860  auto push_ret = gst_app_src_push_sample(GST_APP_SRC_CAST(appsrc_), sample);
861  gst_sample_unref(sample);
862 }
863 
864 
865 int main(int argc, char **argv)
866 {
867  // added
868  ros::init(argc, argv, "webrtc");
870 
872  ROS_INFO("INIT webrtc");
873 
874  image_sub_ = image_transport_->subscribe("image_raw", 10, image_cb);
875 
876  GOptionContext *context;
877  GError *error = NULL;
878 
879  context = g_option_context_new ("- gstreamer webrtc sendrecv");
880  g_option_context_add_main_entries (context, entries, NULL);
881  g_option_context_add_group (context, gst_init_get_option_group ());
882  if (!g_option_context_parse (context, &argc, &argv, &error)) {
883  g_printerr ("Error initializing: %s\n", error->message);
884  return -1;
885  }
886 
887  if (!check_plugins ())
888  return -1;
889 
890 
891  /* Disable ssl when running a localhost server, because
892  * it's probably a test server with a self-signed certificate */
893 
894  GstUri *uri = gst_uri_from_string (server_url);
895  if (g_strcmp0 ("localhost", gst_uri_get_host (uri)) == 0 ||
896  g_strcmp0 ("127.0.0.1", gst_uri_get_host (uri)) == 0)
897  disable_ssl = true;
898  gst_uri_unref (uri);
899 
900  double start = 0;
901 
902  webrtcstatus_pub = n.advertise<std_msgs::String>("webrtcstatus", 10);
903 
904  std_msgs::String msg;
905  msg.data = "waiting";
906  webrtcstatus_pub.publish(msg);
907 
908  ros::Rate r(100); // 100 hz
909  while (ros::ok())
910  {
911  ros::spinOnce();
912  r.sleep();
913 
914  if (loop == NULL && serverConnectStarted) break;
915 
916  if (serverConnectStarted && start==0) start = ros::Time::now().toSec();
917 
918  if (start !=0 && !serverConnected && ros::Time::now().toSec() - start > 2) {
919  g_printerr("server connect failure, exit\n");
920  return -1;
921  }
922 
923  }
924 
925  // cleanup_and_quit_loop("ros shutdown", APP_STATE_UNKNOWN); // shutdown GMainLoop
926 
927  if (pipe1) {
928  gst_element_set_state (GST_ELEMENT (pipe1), GST_STATE_NULL);
929  g_print ("Pipeline stopped\n");
930  gst_object_unref (pipe1);
931  }
932 
933  return 0;
934 }
static void on_server_connected(SoupSession *session, GAsyncResult *res, SoupMessage *msg)
Definition: webrtc.cpp:680
static GMainLoop * loop
Definition: webrtc.cpp:48
static int retries
Definition: webrtc.cpp:63
static void send_sdp_to_peer(GstWebRTCSessionDescription *desc)
Definition: webrtc.cpp:248
std::unique_ptr< image_transport::ImageTransport > image_transport_
Definition: webrtc.cpp:59
int main(int argc, char **argv)
Definition: webrtc.cpp:865
void publish(const boost::shared_ptr< M > &message) const
static GObject * receive_channel
Definition: webrtc.cpp:50
#define STUN_SERVER
Definition: webrtc.cpp:342
static const int MAXRETRIES
Definition: webrtc.cpp:64
static AppState app_state
Definition: webrtc.cpp:53
static GObject * send_channel
Definition: webrtc.cpp:50
static const gchar * video_height
Definition: webrtc.cpp:69
bool err
#define RTP_CAPS_OPUS
Definition: webrtc.cpp:343
static const gchar * video_bitrate
Definition: webrtc.cpp:70
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
static void on_ice_gathering_state_notify(GstElement *webrtcbin, GParamSpec *pspec, gpointer user_data)
Definition: webrtc.cpp:320
static const gchar * video_width
Definition: webrtc.cpp:68
GstCaps * gst_caps_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
Definition: webrtc.cpp:770
AppState
Definition: webrtc.cpp:25
static GstElement * appsrc_
Definition: webrtc.cpp:49
static GOptionEntry entries[]
Definition: webrtc.cpp:75
static void on_server_closed(SoupWebsocketConnection *conn G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED)
Definition: webrtc.cpp:511
static void connect_to_websocket_server_async(void)
Definition: webrtc.cpp:714
std::map< std::string, std::string > M_string
static gboolean cleanup_and_quit_loop(const gchar *msg, AppState state)
Definition: webrtc.cpp:90
void resample(std::vector< int > &indexes, const WeightVector &weights, unsigned int nparticles=0)
static GstElement * pipe1
Definition: webrtc.cpp:49
GstSample * gst_sample_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
Definition: webrtc.cpp:805
static void handle_media_stream(GstPad *pad, GstElement *pipe, const char *convert_name, const char *sink_name)
Definition: webrtc.cpp:137
static const gchar * turnserver_port
Definition: webrtc.cpp:72
#define RTP_CAPS_VP8
Definition: webrtc.cpp:344
gboolean serverConnectStarted
Definition: webrtc.cpp:66
#define ROS_INFO(...)
std::thread loop_thread_
Definition: webrtc.cpp:62
ROSCPP_DECL bool ok()
image_transport::Subscriber image_sub_
Definition: webrtc.cpp:60
static void on_incoming_stream(GstElement *webrtc, GstPad *pad, GstElement *pipe)
Definition: webrtc.cpp:204
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
static gboolean disable_ssl
Definition: webrtc.cpp:56
static SoupWebsocketConnection * ws_conn
Definition: webrtc.cpp:52
static const gchar * turnserver_login
Definition: webrtc.cpp:71
static void on_server_message(SoupWebsocketConnection *conn, SoupWebsocketDataType type, GBytes *message, gpointer user_data)
Definition: webrtc.cpp:525
bool sleep()
static gboolean start_pipeline(void)
Definition: webrtc.cpp:346
void image_cb(const sensor_msgs::Image::ConstPtr &msg)
Definition: webrtc.cpp:846
static void on_incoming_decodebin_stream(GstElement *decodebin, GstPad *pad, GstElement *pipe)
Definition: webrtc.cpp:179
static Time now()
static void send_ice_candidate_message(GstElement *webrtc G_GNUC_UNUSED, guint mlineindex, gchar *candidate, gpointer user_data G_GNUC_UNUSED)
Definition: webrtc.cpp:224
static const gchar * server_url
Definition: webrtc.cpp:55
static gboolean check_plugins(void)
Definition: webrtc.cpp:747
static gboolean setup_call(void)
Definition: webrtc.cpp:468
static gchar * get_string_from_json_object(JsonObject *object)
Definition: webrtc.cpp:118
static GstElement * webrtc1
Definition: webrtc.cpp:49
static void on_negotiation_needed(GstElement *element, gpointer user_data)
Definition: webrtc.cpp:310
#define n
static void on_offer_created(GstPromise *promise, gpointer user_data)
Definition: webrtc.cpp:286
ROSCPP_DECL void spinOnce()
static GCancellable * serverConnection
Definition: webrtc.cpp:61
static const gchar * audio_device
Definition: webrtc.cpp:67
#define ROS_ERROR(...)
void startLoopAndConnect()
Definition: webrtc.cpp:829
static const gchar * peer_id
Definition: webrtc.cpp:54
static ros::Publisher webrtcstatus_pub
Definition: webrtc.cpp:73
static gboolean register_with_server(void)
Definition: webrtc.cpp:488
gboolean serverConnected
Definition: webrtc.cpp:65


oculusprime
Author(s): Colin Adamson
autogenerated on Wed Mar 10 2021 03:14:59