data_caller.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 import rospy
5 import threading
6 import signal
7 import os
8 from utils.peer import *
9 from utils.data import *
10 from utils.service_client import *
11 
12 
13 def signal_handler(peer_id, token):
14  def internal(signal, frame):
15  # delete the peer object to exit
16  request = delete_peer_request(peer_id, token)
17  response = skyway_control(request)
18 
19  return internal
20 
21 
22 class EventListener(threading.Thread):
23  _peer_id = ""
24  _token = ""
25  _is_running = True
26  _pid_map = {}
27 
28  def __init__(self, peer_id, token):
29  super().__init__()
30  self._peer_id = peer_id
31  self._token = token
32 
33  def run(self):
34  has_data_config = rospy.has_param(rospy.get_name() + "/data")
35  if not has_data_config:
36  rospy.logdebug("no data config. ignore data events")
37 
38  has_media_config = rospy.has_param(rospy.get_name() + "/media")
39 
40  while self._is_running:
41  event = skyway_event(self._peer_id, self._token)
42  if event is None:
43  continue
44  if "result" not in event:
45  continue
46  if "event" not in event["result"]:
47  continue
48 
49  request_type = event["result"]["request_type"]
50  if request_type == "PEER":
51  self._peer_event_react(
52  event, has_data_config, has_media_config
53  )
54  elif request_type == "DATA":
55  if has_data_config:
56  self._data_event_react(event)
57  elif request_type == "MEDIA":
58  if has_media_config:
59  self._media_event_react(event)
60 
61  def _peer_event_react(self, event, has_data_config, has_media_config):
62  if event["result"]["event"] == "CLOSE":
63  # Terminate this program when the release of the peer object is confirmed.
64  self._is_running = False
65  # SkyWay for ROSを落とす
66  response = skyway_control(
67  '{"request_type": "SYSTEM", "command": "SHUTDOWN"}'
68  )
69  # gStreamer Controllerを落とす
70  rospy.signal_shutdown("finish")
71 
72  def _data_event_react(self, event):
73  if not event["is_success"]:
74  return
75 
76  # このイベント発火後、データの転送を行っても良い
77  if event["result"]["event"] == "OPEN":
78  # DataConnectionのstatus取得
79  data_connection_status_request = create_data_status_request(
80  event["result"]["data_connection_id"]
81  )
82  data_connection_status_response = skyway_control(
83  data_connection_status_request
84  )
85  rospy.loginfo("DataConnection Status")
86  rospy.loginfo(data_connection_status_response)
87 
88  rospy.loginfo("you can send data now")
89 
90  if event["result"]["event"] == "CLOSE":
91  rospy.loginfo(
92  f"DataConnection {event['result']['data_connection_id']} disconnected"
93  )
94 
95  def _media_event_react(self, event):
96  # data系のサンプルなので省略
97  pass
98 
99 
100 def main():
101  if not rospy.has_param(rospy.get_name() + "/peer_id"):
102  rospy.logerr("no peer_id")
103  exit(0)
104  peer_id = rospy.get_param(rospy.get_name() + "/peer_id")
105 
106  if "API_KEY" not in os.environ:
107  rospy.logerr("no API_KEY")
108  rospy.logerr("exiting")
109  exit(0)
110 
111  key = os.environ["API_KEY"]
112  request = create_peer_request(peer_id, key)
113  peer_create_response = skyway_control(request)
114 
115  if not peer_create_response["is_success"]:
116  return
117  else:
118  # succeed to create a peer object
119  peer_id = peer_create_response["result"]["peer_id"]
120  token = peer_create_response["result"]["token"]
121 
122  # Peer Statusのチェックをする場合
123  # 接続済みなので、disconnectedはFalseになっているのが正しい
124  status_request = create_peer_status_request(peer_id, token)
125  status_response = skyway_control(status_request)
126  rospy.loginfo("Peer Object has been created")
127  rospy.loginfo(status_response)
128 
129  # DataConnectionの確立を開始する
130  message = create_connect_request(
131  peer_id,
132  token,
133  "target_id",
134  {
135  "type": "string",
136  "plugins": [
137  {"plugin_name": "string_loopback::StringLoopback"}
138  ],
139  },
140  json.loads('{"foo": "bar"}'),
141  "da-50a32bab-b3d9-4913-8e20-f79c90a6a211",
142  "127.0.0.1",
143  10000,
144  )
145  response = skyway_control(message)
146 
147  # hook ctrl-c to delete the peer object when exiting
148  signal.signal(signal.SIGINT, signal_handler(peer_id, token))
149 
150  event_thread = EventListener(peer_id, token)
151  event_thread.start()
152 
153  rospy.spin()
154  event_thread.join()
155 
156 
157 if __name__ == "__main__":
158  rospy.init_node("my_node", log_level=rospy.INFO)
159  main()
def _data_event_react(self, event)
Definition: data_caller.py:72
def _peer_event_react(self, event, has_data_config, has_media_config)
Definition: data_caller.py:61
def create_peer_status_request(peer_id, token)
Definition: peer.py:29
def __init__(self, peer_id, token)
Definition: data_caller.py:28
def skyway_event(peer_id, token)
def skyway_control(json_str)
def create_connect_request(peer_id, token, target_id, plugin_info, metadata, data_id, redirect_ip, redirect_port)
Definition: data.py:13
def delete_peer_request(peer_id, token)
Definition: peer.py:16
def signal_handler(peer_id, token)
Definition: data_caller.py:13
def create_peer_request(peer_id, api_key)
Definition: peer.py:1
def _media_event_react(self, event)
Definition: data_caller.py:95
Definition: main.py:1
def create_data_status_request(data_connection_id)
Definition: data.py:58


skyway
Author(s): Toshiya Nakakura
autogenerated on Sat Apr 15 2023 02:08:21