7 #include <gst/sdp/sdp.h> 8 #include <gst/app/app.h> 10 #define GST_USE_UNSTABLE_API 11 #include <gst/webrtc/webrtc.h> 14 #include <libsoup/soup.h> 15 #include <json-glib/json-glib.h> 23 #include <std_msgs/String.h> 52 static SoupWebsocketConnection *
ws_conn =
nullptr;
77 {
"peer-id", 0, 0, G_OPTION_ARG_STRING, &
peer_id,
"String ID of the peer to connect to",
"ID" },
78 {
"server", 0, 0, G_OPTION_ARG_STRING, &
server_url,
"Signalling server to connect to",
"URL" },
79 {
"disable-ssl", 0, 0, G_OPTION_ARG_NONE, &
disable_ssl,
"Disable ssl", NULL },
80 {
"audio-device", 0, 0, G_OPTION_ARG_STRING, &
audio_device,
"String of the audio device number",
"ID" },
81 {
"video-width", 0, 0, G_OPTION_ARG_STRING, &
video_width,
"video width pixels", NULL },
82 {
"video-height", 0, 0, G_OPTION_ARG_STRING, &
video_height,
"video height pixels", NULL },
83 {
"video-bitrate", 0, 0, G_OPTION_ARG_STRING, &
video_bitrate,
"target bitrate kbps", NULL },
84 {
"turnserver-port", 0, 0, G_OPTION_ARG_STRING, &
turnserver_port,
"TURN server port number", NULL },
85 {
"turnserver-login", 0, 0, G_OPTION_ARG_STRING, &
turnserver_login,
"TURN server user:pass", NULL },
93 g_printerr (
"%s\n", msg);
98 if (soup_websocket_connection_get_state (
ws_conn) ==
99 SOUP_WEBSOCKET_STATE_OPEN)
101 soup_websocket_connection_close (
ws_conn, 1000,
"");
107 g_main_loop_quit (
loop);
110 g_print(
"g_main_loop quit\n");
114 return G_SOURCE_REMOVE;
121 JsonGenerator *generator;
125 root = json_node_init_object (json_node_alloc (),
object);
126 generator = json_generator_new ();
127 json_generator_set_root (generator, root);
128 text = json_generator_to_data (generator, NULL);
131 g_object_unref (generator);
132 json_node_free (root);
138 const char * sink_name)
141 GstElement *q, *conv, *
resample, *sink;
142 GstPadLinkReturn ret;
144 g_print (
"Trying to handle stream with %s ! %s", convert_name, sink_name);
146 q = gst_element_factory_make (
"queue", NULL);
147 g_assert_nonnull (q);
148 conv = gst_element_factory_make (convert_name, NULL);
149 g_assert_nonnull (conv);
150 sink = gst_element_factory_make (sink_name, NULL);
151 g_assert_nonnull (sink);
153 if (g_strcmp0 (convert_name,
"audioconvert") == 0) {
156 resample = gst_element_factory_make (
"audioresample", NULL);
157 g_assert_nonnull (resample);
158 gst_bin_add_many (GST_BIN (pipe), q, conv, resample, sink, NULL);
159 gst_element_sync_state_with_parent (q);
160 gst_element_sync_state_with_parent (conv);
161 gst_element_sync_state_with_parent (resample);
162 gst_element_sync_state_with_parent (sink);
163 gst_element_link_many (q, conv, resample, sink, NULL);
165 gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL);
166 gst_element_sync_state_with_parent (q);
167 gst_element_sync_state_with_parent (conv);
168 gst_element_sync_state_with_parent (sink);
169 gst_element_link_many (q, conv, sink, NULL);
172 qpad = gst_element_get_static_pad (q,
"sink");
174 ret = gst_pad_link (pad, qpad);
175 g_assert_cmphex (ret, ==, GST_PAD_LINK_OK);
185 if (!gst_pad_has_current_caps (pad)) {
186 g_printerr (
"Pad '%s' has no caps, can't do anything, ignoring\n",
191 caps = gst_pad_get_current_caps (pad);
192 name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
194 if (g_str_has_prefix (name,
"video")) {
196 }
else if (g_str_has_prefix (name,
"audio")) {
199 g_printerr (
"Unknown pad %s, ignoring", GST_PAD_NAME (pad));
206 GstElement *decodebin;
209 if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
212 decodebin = gst_element_factory_make (
"decodebin", NULL);
213 g_signal_connect (decodebin,
"pad-added",
215 gst_bin_add (GST_BIN (pipe), decodebin);
216 gst_element_sync_state_with_parent (decodebin);
218 sinkpad = gst_element_get_static_pad (decodebin,
"sink");
219 gst_pad_link (pad, sinkpad);
220 gst_object_unref (sinkpad);
225 gchar * candidate, gpointer user_data G_GNUC_UNUSED)
228 JsonObject *ice, *msg;
235 ice = json_object_new ();
236 json_object_set_string_member (ice,
"candidate", candidate);
237 json_object_set_int_member (ice,
"sdpMLineIndex", mlineindex);
238 msg = json_object_new ();
239 json_object_set_object_member (msg,
"ice", ice);
241 json_object_unref (msg);
243 soup_websocket_connection_send_text (
ws_conn, text);
251 JsonObject *msg, *sdp;
259 text = gst_sdp_message_as_text (desc->sdp);
260 sdp = json_object_new ();
262 if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER) {
263 g_print (
"Sending offer:\n%s\n", text);
264 json_object_set_string_member (sdp,
"type",
"offer");
265 }
else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER) {
266 g_print (
"Sending answer:\n%s\n", text);
267 json_object_set_string_member (sdp,
"type",
"answer");
269 g_assert_not_reached ();
272 json_object_set_string_member (sdp,
"sdp", text);
275 msg = json_object_new ();
276 json_object_set_object_member (msg,
"sdp", sdp);
278 json_object_unref (msg);
280 soup_websocket_connection_send_text (
ws_conn, text);
288 GstWebRTCSessionDescription *offer = NULL;
289 const GstStructure *reply;
293 g_assert_cmphex (gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED);
294 reply = gst_promise_get_reply (promise);
295 gst_structure_get (reply,
"offer",
296 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
297 gst_promise_unref (promise);
299 promise = gst_promise_new ();
300 g_signal_emit_by_name (
webrtc1,
"set-local-description", offer, promise);
301 gst_promise_interrupt (promise);
302 gst_promise_unref (promise);
306 gst_webrtc_session_description_free (offer);
315 promise = gst_promise_new_with_change_func (
on_offer_created, user_data, NULL);
316 g_signal_emit_by_name (
webrtc1,
"create-offer", NULL, promise);
323 GstWebRTCICEGatheringState ice_gather_state;
324 const gchar *new_state =
"unknown";
326 g_object_get (webrtcbin,
"ice-gathering-state", &ice_gather_state, NULL);
327 switch (ice_gather_state) {
328 case GST_WEBRTC_ICE_GATHERING_STATE_NEW:
331 case GST_WEBRTC_ICE_GATHERING_STATE_GATHERING:
332 new_state =
"gathering";
334 case GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE:
335 new_state =
"complete";
338 g_print (
"ICE gathering state changed to %s\n", new_state);
342 #define STUN_SERVER " stun-server=stun://stun.l.google.com:19302 " 343 #define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload=" 344 #define RTP_CAPS_VP8 "application/x-rtp,media=video,encoding-name=VP8,payload=" 348 GstStateChangeReturn ret;
349 GError *error = NULL;
352 gchar *pl = g_strconcat (
"webrtcbin bundle-policy=max-bundle name=sendrecv " STUN_SERVER 355 "videoconvert ! queue ! vp8enc deadline=1 target-bitrate=",
video_bitrate,
"000 ! rtpvp8pay ! " 358 pipe1 = gst_parse_launch (pl, &error);
362 gchar *pl = g_strconcat (
"webrtcbin bundle-policy=max-bundle name=sendrecv " STUN_SERVER 365 "videoconvert ! queue ! vp8enc deadline=1 target-bitrate=",
video_bitrate,
"000 ! rtpvp8pay ! " 367 "alsasrc device=hw:",
audio_device,
" ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! " 370 pipe1 = gst_parse_launch (pl, &error);
375 g_printerr (
"Failed to parse launch: %s\n", error->message);
376 g_error_free (error);
383 appsrc_ = gst_element_factory_make(
"appsrc",
"source");
385 ROS_ERROR(
"GST: failed to create appsrc!");
389 gst_app_src_set_stream_type(GST_APP_SRC_CAST(
appsrc_), GST_APP_STREAM_TYPE_STREAM);
390 gst_app_src_set_latency(GST_APP_SRC_CAST(
appsrc_), 0, -1);
391 g_object_set(GST_OBJECT(
appsrc_),
392 "format", GST_FORMAT_TIME,
395 "do-timestamp",
true,
399 GstPad *inpad = gst_bin_find_unlinked_pad(GST_BIN(
pipe1), GST_PAD_SINK);
402 GstElement *inelement = gst_pad_get_parent_element(inpad);
404 gst_object_unref(GST_OBJECT(inpad));
405 ROS_INFO(
"GST: inelement: %s", gst_element_get_name(inelement));
409 gst_object_unref(GST_OBJECT(
pipe1));
410 gst_object_unref(GST_OBJECT(inelement));
414 if (!gst_element_link(
appsrc_, inelement)) {
417 gst_element_get_name(inelement));
418 gst_object_unref(GST_OBJECT(
pipe1));
419 gst_object_unref(GST_OBJECT(inelement));
423 gst_object_unref(GST_OBJECT(inelement));
428 webrtc1 = gst_bin_get_by_name (GST_BIN (
pipe1),
"sendrecv");
433 g_signal_connect (
webrtc1,
"on-negotiation-needed",
438 g_signal_connect (
webrtc1,
"on-ice-candidate",
440 g_signal_connect (
webrtc1,
"notify::ice-gathering-state",
443 gst_element_set_state (
pipe1, GST_STATE_READY);
451 g_print (
"Starting pipeline\n");
452 ret = gst_element_set_state (GST_ELEMENT (
pipe1), GST_STATE_PLAYING);
453 if (ret == GST_STATE_CHANGE_FAILURE)
459 g_print (
"ERROR starting pipeline\n");
461 g_clear_object (&
pipe1);
472 if (soup_websocket_connection_get_state (
ws_conn) !=
473 SOUP_WEBSOCKET_STATE_OPEN)
479 g_print (
"Setting up signalling server call with %s\n",
peer_id);
481 msg = g_strdup_printf (
"SESSION %s",
peer_id);
482 soup_websocket_connection_send_text (
ws_conn, msg);
493 if (soup_websocket_connection_get_state (
ws_conn) !=
494 SOUP_WEBSOCKET_STATE_OPEN)
497 our_id = g_random_int_range (10, 10000);
498 g_print (
"Registering id %i with server\n", our_id);
503 hello = g_strdup_printf (
"HELLO %i", our_id);
504 soup_websocket_connection_send_text (
ws_conn, hello);
512 gpointer user_data G_GNUC_UNUSED)
516 g_print (
"Server connection closed, trying to continue anyway\n");
518 std_msgs::String msg;
519 msg.data =
"disconnected";
526 GBytes * message, gpointer user_data)
531 case SOUP_WEBSOCKET_DATA_BINARY:
532 g_printerr (
"Received unknown binary message, ignoring\n");
534 case SOUP_WEBSOCKET_DATA_TEXT: {
536 const gchar *data =
reinterpret_cast<const gchar*
>(g_bytes_get_data (message, &size));
538 text = g_strndup (data, size);
542 g_assert_not_reached ();
546 if (g_strcmp0 (text,
"HELLO") == 0) {
553 g_print (
"Registered with server\n");
560 }
else if (g_strcmp0 (text,
"SESSION_OK") == 0) {
573 }
else if (g_str_has_prefix (text,
"ERROR")) {
595 JsonObject *object, *child;
596 JsonParser *
parser = json_parser_new ();
597 if (!json_parser_load_from_data (parser, text, -1, NULL)) {
598 g_printerr (
"Unknown message '%s', ignoring", text);
599 g_object_unref (parser);
603 root = json_parser_get_root (parser);
604 if (!JSON_NODE_HOLDS_OBJECT (root)) {
605 g_printerr (
"Unknown json message '%s', ignoring", text);
606 g_object_unref (parser);
610 object = json_node_get_object (root);
612 if (json_object_has_member (
object,
"sdp")) {
615 const gchar *text, *sdptype;
616 GstWebRTCSessionDescription *answer;
620 child = json_object_get_object_member (
object,
"sdp");
622 if (!json_object_has_member (child,
"type")) {
628 sdptype = json_object_get_string_member (child,
"type");
635 text = json_object_get_string_member (child,
"sdp");
636 ret = gst_sdp_message_new (&sdp);
637 g_assert_cmphex (ret, ==, GST_SDP_OK);
638 ret = gst_sdp_message_parse_buffer ((guint8 *) text, strlen (text), sdp);
639 g_assert_cmphex (ret, ==, GST_SDP_OK);
641 if (g_str_equal (sdptype,
"answer")) {
642 g_print (
"Received answer:\n%s\n", text);
643 answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER,
645 g_assert_nonnull (answer);
649 GstPromise *promise = gst_promise_new ();
650 g_signal_emit_by_name (
webrtc1,
"set-remote-description", answer,
652 gst_promise_interrupt (promise);
653 gst_promise_unref (promise);
658 }
else if (json_object_has_member (
object,
"ice")) {
659 const gchar *candidate;
662 child = json_object_get_object_member (
object,
"ice");
663 candidate = json_object_get_string_member (child,
"candidate");
664 sdpmlineindex = json_object_get_int_member (child,
"sdpMLineIndex");
667 g_signal_emit_by_name (
webrtc1,
"add-ice-candidate", sdpmlineindex,
670 g_printerr (
"Ignoring unknown JSON message:\n%s\n", text);
672 g_object_unref (parser);
685 GError *error = NULL;
687 ws_conn = soup_session_websocket_connect_finish (session, res, &error);
690 g_error_free (error);
697 g_print (
"Connected to signalling server\n");
705 std_msgs::String rosmsg;
706 rosmsg.data =
"connected";
707 webrtcstatus_pub.
publish(rosmsg);
717 SoupMessage *message;
718 SoupSession *session;
719 const char *https_aliases[] = {
"wss", NULL};
721 session = soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, !
disable_ssl,
722 SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
724 SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
730 message = soup_message_new (SOUP_METHOD_GET,
server_url);
732 gchar *msg = g_strconcat (
"Connecting to server: ",
server_url,
"\n", NULL);
739 soup_session_websocket_connect_async (session, message, NULL, NULL,
serverConnection,
752 GstRegistry *registry;
753 const gchar *needed[] = {
"opus",
"vpx",
"nice",
"webrtc",
"dtls",
"srtp",
754 "rtpmanager",
"videotestsrc",
"audiotestsrc", NULL};
756 registry = gst_registry_get ();
758 for (i = 0; i < g_strv_length ((gchar **) needed); i++) {
759 plugin = gst_registry_find_plugin (registry, needed[i]);
761 g_print (
"Required gstreamer plugin '%s' not found\n", needed[i]);
765 gst_object_unref (plugin);
775 {enc::RGB16,
"RGB16"},
776 {enc::RGBA8,
"RGBA"},
777 {enc::RGBA16,
"RGBA16"},
779 {enc::BGR16,
"BGR16"},
780 {enc::BGRA8,
"BGRA"},
781 {enc::BGRA16,
"BGRA16"},
782 {enc::MONO8,
"GRAY8"},
783 {enc::MONO16,
"GRAY16_LE"},
786 if (msg->is_bigendian) {
787 ROS_ERROR(
"GST: big endian image format is not supported");
791 auto format = known_formats.find(msg->encoding);
792 if (format == known_formats.end()) {
793 ROS_ERROR(
"GST: image format '%s' unknown", msg->encoding.c_str());
797 return gst_caps_new_simple(
"video/x-raw",
798 "format", G_TYPE_STRING, format->second.c_str(),
799 "width", G_TYPE_INT, msg->width,
800 "height", G_TYPE_INT, msg->height,
801 "framerate", GST_TYPE_FRACTION, 0, 1,
807 auto buffer = gst_buffer_new_allocate(
nullptr, msg->data.size(),
nullptr);
810 gst_buffer_fill(buffer, 0, msg->data.data(), msg->data.size());
811 GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_LIVE);
814 if (caps ==
nullptr) {
815 gst_object_unref(GST_OBJECT(buffer));
819 auto sample = gst_sample_new(buffer, caps,
nullptr,
nullptr);
820 gst_buffer_unref(buffer);
821 gst_caps_unref(caps);
832 loop = g_main_loop_new (NULL, FALSE);
838 g_main_loop_run (
loop);
839 g_main_loop_unref (
loop);
840 g_print(
"loop_thread_ exit\n");
846 void image_cb(
const sensor_msgs::Image::ConstPtr &msg)
857 if (sample ==
nullptr)
860 auto push_ret = gst_app_src_push_sample(GST_APP_SRC_CAST(
appsrc_), sample);
861 gst_sample_unref(sample);
865 int main(
int argc,
char **argv)
876 GOptionContext *context;
877 GError *error = NULL;
879 context = g_option_context_new (
"- gstreamer webrtc sendrecv");
880 g_option_context_add_main_entries (context,
entries, NULL);
881 g_option_context_add_group (context, gst_init_get_option_group ());
882 if (!g_option_context_parse (context, &argc, &argv, &error)) {
883 g_printerr (
"Error initializing: %s\n", error->message);
894 GstUri *uri = gst_uri_from_string (
server_url);
895 if (g_strcmp0 (
"localhost", gst_uri_get_host (uri)) == 0 ||
896 g_strcmp0 (
"127.0.0.1", gst_uri_get_host (uri)) == 0)
902 webrtcstatus_pub = n.
advertise<std_msgs::String>(
"webrtcstatus", 10);
904 std_msgs::String msg;
905 msg.data =
"waiting";
919 g_printerr(
"server connect failure, exit\n");
928 gst_element_set_state (GST_ELEMENT (
pipe1), GST_STATE_NULL);
929 g_print (
"Pipeline stopped\n");
930 gst_object_unref (
pipe1);
static void on_server_connected(SoupSession *session, GAsyncResult *res, SoupMessage *msg)
static void send_sdp_to_peer(GstWebRTCSessionDescription *desc)
std::unique_ptr< image_transport::ImageTransport > image_transport_
int main(int argc, char **argv)
void publish(const boost::shared_ptr< M > &message) const
static GObject * receive_channel
static const int MAXRETRIES
static AppState app_state
static GObject * send_channel
static const gchar * video_height
static const gchar * video_bitrate
ROSCPP_DECL void init(int &argc, char **argv, const std::string &name, uint32_t options=0)
static void on_ice_gathering_state_notify(GstElement *webrtcbin, GParamSpec *pspec, gpointer user_data)
static const gchar * video_width
GstCaps * gst_caps_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
static GstElement * appsrc_
static GOptionEntry entries[]
static void on_server_closed(SoupWebsocketConnection *conn G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED)
static void connect_to_websocket_server_async(void)
std::map< std::string, std::string > M_string
static gboolean cleanup_and_quit_loop(const gchar *msg, AppState state)
void resample(std::vector< int > &indexes, const WeightVector &weights, unsigned int nparticles=0)
static GstElement * pipe1
GstSample * gst_sample_new_from_image(const sensor_msgs::Image::ConstPtr &msg)
static void handle_media_stream(GstPad *pad, GstElement *pipe, const char *convert_name, const char *sink_name)
static const gchar * turnserver_port
gboolean serverConnectStarted
image_transport::Subscriber image_sub_
static void on_incoming_stream(GstElement *webrtc, GstPad *pad, GstElement *pipe)
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
static gboolean disable_ssl
static SoupWebsocketConnection * ws_conn
static const gchar * turnserver_login
static void on_server_message(SoupWebsocketConnection *conn, SoupWebsocketDataType type, GBytes *message, gpointer user_data)
static gboolean start_pipeline(void)
void image_cb(const sensor_msgs::Image::ConstPtr &msg)
static void on_incoming_decodebin_stream(GstElement *decodebin, GstPad *pad, GstElement *pipe)
static void send_ice_candidate_message(GstElement *webrtc G_GNUC_UNUSED, guint mlineindex, gchar *candidate, gpointer user_data G_GNUC_UNUSED)
static const gchar * server_url
static gboolean check_plugins(void)
static gboolean setup_call(void)
static gchar * get_string_from_json_object(JsonObject *object)
static GstElement * webrtc1
static void on_negotiation_needed(GstElement *element, gpointer user_data)
static void on_offer_created(GstPromise *promise, gpointer user_data)
ROSCPP_DECL void spinOnce()
static GCancellable * serverConnection
static const gchar * audio_device
void startLoopAndConnect()
static const gchar * peer_id
static ros::Publisher webrtcstatus_pub
static gboolean register_with_server(void)