media_callee.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 import sys
5 import json
6 import rospy
7 import threading
8 import signal
9 import subprocess
10 from subprocess import PIPE
11 import os
12 from utils.peer import *
13 from utils.service_client import *
14 from utils.media import *
15 
16 
17 def signal_handler(peer_id, token):
18  def internal(signal, frame):
19  # delete the peer object to exit
20  request = delete_peer_request(peer_id, token)
21  response = skyway_control(request)
22 
23  return internal
24 
25 
26 class EventListener(threading.Thread):
27  _peer_id = ""
28  _token = ""
29  _is_running = True
30  _pid_map = {}
31 
32  def __init__(self, peer_id, token):
33  super().__init__()
34  self._peer_id = peer_id
35  self._token = token
36 
37  def run(self):
38  has_data_config = rospy.has_param(rospy.get_name() + "/data")
39  if not has_data_config:
40  rospy.logdebug("no data config. ignore data events")
41 
42  has_media_config = rospy.has_param(rospy.get_name() + "/media")
43  if not has_media_config:
44  rospy.logdebug("no media config. ignore media events")
45 
46  while self._is_running:
47  event = skyway_event(self._peer_id, self._token)
48  if event is None:
49  continue
50  if "result" not in event:
51  continue
52  if "event" not in event["result"]:
53  continue
54 
55  request_type = event["result"]["request_type"]
56  if request_type == "PEER":
57  self._peer_event_react(
58  event, has_data_config, has_media_config
59  )
60  elif request_type == "DATA":
61  if has_data_config:
62  self._data_event_react(event)
63  elif request_type == "MEDIA":
64  if has_media_config:
65  self._media_event_react(event)
66 
67  def _peer_event_react(self, event, has_data_config, has_media_config):
68  if event["result"]["event"] == "CLOSE":
69  # Terminate this program when the release of the peer object is confirmed.
70  self._is_running = False
71  # SkyWay for ROSを落とす
72  response = skyway_control(
73  '{"request_type": "SYSTEM", "command": "SHUTDOWN"}'
74  )
75  # gStreamer Controllerを落とす
76  (result, pid) = gst_launch("SYSTEM_EXIT", "", 0)
77  rospy.signal_shutdown("finish")
78  elif event["result"]["event"] == "CALL":
79  if not has_media_config:
80  return
81 
82  media_connection_id = event["result"]["call_params"][
83  "media_connection_id"
84  ]
85  answer_query = create_answer_query(media_connection_id)
86  response = skyway_control(answer_query)
87 
88  def _data_event_react(self, event):
89  # media系のサンプルなのでdata系の処理は省略
90  pass
91 
92  def _media_event_react(self, event):
93  if not event["is_success"]:
94  return
95 
96  if event["result"]["event"] == "STREAM":
97  # media connectionのstatusを取得
98  media_status_request = create_media_status_request(
99  event["result"]["media_connection_id"]
100  )
101  media_status_response = skyway_control(media_status_request)
102  rospy.loginfo("MediaConnection Status")
103  rospy.loginfo(media_status_response)
104 
105  # STREAMイベントが発火したあとはメディアの転送を行って良い
106  # メディアの送信先情報に関しては、eventパラメータから取得できる。
107  # このサンプルでは、取得した情報を用いて、config内のスクリプトを置き換え、gStreamerを起動している
108 
109  # スクリプトの取得
110  script = rospy.get_param(rospy.get_name() + "/media/gst_script")
111 
112  # videoの送信先情報の取得
113  video_send_params = parse_media_info(
114  event["result"]["send_params"]["video"]["media"]
115  )
116  # video RTPの送信先ポートの情報をセット
117  script = script.replace(
118  "DEST_VIDEO_RTP_PORT", str(video_send_params[1])
119  )
120  # video RTCPの送信先ポートの情報をセット
121  script = script.replace(
122  "DEST_VIDEO_RTCP_PORT", str(video_send_params[3])
123  )
124  # videoの送信先はaudioと同じであるため最後に一回置換する
125 
126  # audioの送信先情報の取得
127  audio_send_params = parse_media_info(
128  event["result"]["send_params"]["audio"]["media"]
129  )
130  # audio RTPの送信先ポートの情報をセット
131  script = script.replace(
132  "DEST_AUDIO_RTP_PORT", str(audio_send_params[1])
133  )
134  # audio RTCPの送信先ポートの情報をセット
135  script = script.replace(
136  "DEST_AUDIO_RTCP_PORT", str(audio_send_params[3])
137  )
138  # audioの送信先はvideoと同じであるため最後に一回置換する
139 
140  # videoの受信ポートの取得
141  # videoストリームはこのIPアドレスとポート番号に転送される
142  video_recv_params = parse_media_info(
143  event["result"]["redirect_params"]["video"]
144  )
145  # video RTPの受信ポートをセット
146  script = script.replace(
147  "SRC_VIDEO_RTP_PORT", str(video_recv_params[1])
148  )
149  # video RTCPの受信ポートをセット
150  script = script.replace(
151  "SRC_VIDEO_RTCP_PORT", str(video_recv_params[3])
152  )
153 
154  # audioの受信ポートの取得
155  # audioストリームはこのIPアドレスとポート番号に転送される
156  audio_recv_params = parse_media_info(
157  event["result"]["redirect_params"]["audio"]
158  )
159  # audio RTPの受信ポートをセット
160  script = script.replace(
161  "SRC_AUDIO_RTP_PORT", str(audio_recv_params[1])
162  )
163  # audio RTCPの受信ポートをセット
164  script = script.replace(
165  "SRC_AUDIO_RTCP_PORT", str(audio_recv_params[3])
166  )
167 
168  # video, audioの送信先IPアドレスのセット
169  # RTCPの送信先IPアドレスも取得している(Tupleの3つめ)が、今回の構成だと同じなので、スクリプト上で省略している
170  script = script.replace("DEST", str(video_send_params[0]))
171 
172  # gStreamer起動サービスのコール
173  (result, pid) = gst_launch("LAUNCH", script, 0)
174  if result:
175  self._pid_map[event["result"]["media_connection_id"]] = pid
176  elif event["result"]["event"] == "CLOSE":
177  media_connection_id = event["result"]["media_connection_id"]
178  if media_connection_id in self._pid_map:
179  pid = self._pid_map[media_connection_id]
180  (result, pid) = gst_launch("EXIT", "", pid)
181  self._pid_map.pop(media_connection_id)
182 
183 
184 def main():
185  if not rospy.has_param(rospy.get_name() + "/peer_id"):
186  rospy.logerr("no peer_id")
187  exit(0)
188  peer_id = rospy.get_param(rospy.get_name() + "/peer_id")
189 
190  if "API_KEY" not in os.environ:
191  rospy.logerr("no API_KEY")
192  rospy.logerr("exiting")
193  exit(0)
194 
195  key = os.environ["API_KEY"]
196  request = create_peer_request(peer_id, key)
197  peer_create_response = skyway_control(request)
198 
199  if not peer_create_response["is_success"]:
200  return
201  else:
202  # succeed to create a peer object
203  peer_id = peer_create_response["result"]["peer_id"]
204  token = peer_create_response["result"]["token"]
205 
206  # Peer Statusのチェックをする場合
207  # 接続済みなので、disconnectedはFalseになっているのが正しい
208  status_request = create_peer_status_request(peer_id, token)
209  status_response = skyway_control(status_request)
210  rospy.loginfo("Peer Object has been created")
211  rospy.loginfo(status_response)
212 
213  # hook ctrl-c to delete the peer object when exiting
214  signal.signal(signal.SIGINT, signal_handler(peer_id, token))
215 
216  # this program only reacts to events
217  event_thread = EventListener(peer_id, token)
218  event_thread.start()
219 
220  rospy.spin()
221  event_thread.join()
222 
223 
224 if __name__ == "__main__":
225  rospy.init_node("my_node", log_level=rospy.INFO)
226  main()
def gst_launch(message_type, command, pid)
def create_peer_status_request(peer_id, token)
Definition: peer.py:29
def signal_handler(peer_id, token)
Definition: media_callee.py:17
def skyway_event(peer_id, token)
def skyway_control(json_str)
def _data_event_react(self, event)
Definition: media_callee.py:88
def delete_peer_request(peer_id, token)
Definition: peer.py:16
def __init__(self, peer_id, token)
Definition: media_callee.py:32
def create_answer_query(media_connection_id)
Definition: media.py:62
def _media_event_react(self, event)
Definition: media_callee.py:92
def parse_media_info(object)
Definition: media.py:126
def create_media_status_request(media_connection_id)
Definition: media.py:114
def create_peer_request(peer_id, api_key)
Definition: peer.py:1
Definition: main.py:1
def _peer_event_react(self, event, has_data_config, has_media_config)
Definition: media_callee.py:67


skyway
Author(s): Toshiya Nakakura
autogenerated on Sat Apr 15 2023 02:08:21