data_callee.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 import rospy
5 import threading
6 import signal
7 import os
8 from utils.peer import *
9 from utils.service_client import *
10 from utils.plugin import *
11 from utils.data import *
12 
13 
14 def signal_handler(peer_id, token):
15  def internal(signal, frame):
16  # delete the peer object to exit
17  request = delete_peer_request(peer_id, token)
18  response = skyway_control(request)
19 
20  return internal
21 
22 
23 class EventListener(threading.Thread):
24  _peer_id = ""
25  _token = ""
26  _plugin_manager = None
27  _is_running = True
28  _pid_map = {}
29 
30  def __init__(self, peer_id, token):
31  super().__init__()
32  self._peer_id = peer_id
33  self._token = token
35 
36  def run(self):
37  has_data_config = rospy.has_param(rospy.get_name() + "/data")
38  if not has_data_config:
39  rospy.logdebug("no data config. ignore data events")
40 
41  has_media_config = rospy.has_param(rospy.get_name() + "/media")
42 
43  while self._is_running:
44  event = skyway_event(self._peer_id, self._token)
45 
46  if event is None:
47  continue
48  if "result" not in event:
49  continue
50  if "event" not in event["result"]:
51  continue
52 
53  request_type = event["result"]["request_type"]
54  if request_type == "PEER":
55  self._peer_event_react(
56  event, has_data_config, has_media_config
57  )
58  elif request_type == "DATA":
59  if has_data_config:
60  self._data_event_react(event)
61  elif request_type == "MEDIA":
62  if has_media_config:
63  self._media_event_react(event)
64 
65  def _peer_event_react(self, event, has_data_config, has_media_config):
66  # このイベント発火後、データの転送を行っても良い
67  if event["result"]["event"] == "OPEN":
68  rospy.loginfo("you can send data now")
69  elif event["result"]["event"] == "CLOSE":
70  # Terminate this program when the release of the peer object is confirmed.
71  self._is_running = False
72  # SkyWay for ROSを落とす
73  response = skyway_control(
74  '{"request_type": "SYSTEM", "command": "SHUTDOWN"}'
75  )
76  rospy.signal_shutdown("finish")
77  elif event["result"]["event"] == "CONNECTION":
78  data_connection_id = event["result"]["data_params"][
79  "data_connection_id"
80  ]
81 
82  # DataConnectionのstatusを取得する
83  # data_calleeではOPENイベントの中で行っているが、
84  # SkyWayではCONNECTIONイベント発火時に既にDataConnectionが確立されているので、
85  # この時点でstatusの取得が可能である
86  data_connection_status_request = create_data_status_request(
87  data_connection_id
88  )
89  data_connection_status_response = skyway_control(
90  data_connection_status_request
91  )
92  rospy.loginfo("DataConnection Status")
93  rospy.loginfo(data_connection_status_response)
94 
95  # データの受信設定を行う
96  # configファイルで与えられていない場合はスキップする
97  if not has_data_config:
98  return
99 
100  if "metadata" not in event["result"]["status"]:
101  return
102 
103  metadata = json.loads(event["result"]["status"]["metadata"])
104  if metadata and "connection_id" in metadata:
105  plugin_id = metadata["connection_id"]
106  plugin_info = self._plugin_manager.on_connect(
107  data_connection_id, plugin_id
108  )
109  if plugin_info:
110  message = redirect_request(data_connection_id, plugin_info)
111  response = skyway_control(message)
112 
113  def _data_event_react(self, event):
114  if not event["is_success"]:
115  return
116 
117  if event["result"]["event"] == "CLOSE":
118  self._plugin_manager.on_disconnect(
119  event["result"]["data_connection_id"]
120  )
121 
122  def _media_event_react(self, event):
123  # data系のサンプルなので省略
124  pass
125 
126 
127 def main():
128  if not rospy.has_param(rospy.get_name() + "/peer_id"):
129  rospy.logerr("no peer_id")
130  exit(0)
131  peer_id = rospy.get_param(rospy.get_name() + "/peer_id")
132 
133  if "API_KEY" not in os.environ:
134  rospy.logerr("no API_KEY")
135  rospy.logerr("exiting")
136  exit(0)
137 
138  key = os.environ["API_KEY"]
139  request = create_peer_request(peer_id, key)
140  peer_create_response = skyway_control(request)
141 
142  if not peer_create_response["is_success"]:
143  return
144  else:
145  # succeed to create a peer object
146  peer_id = peer_create_response["result"]["peer_id"]
147  token = peer_create_response["result"]["token"]
148 
149  # Peer Statusのチェックをする場合
150  # 接続済みなので、disconnectedはFalseになっているのが正しい
151  status_request = create_peer_status_request(peer_id, token)
152  status_response = skyway_control(status_request)
153  rospy.loginfo("Peer Object has been created")
154  rospy.loginfo(status_response)
155 
156  # hook ctrl-c to delete the peer object when exiting
157  signal.signal(signal.SIGINT, signal_handler(peer_id, token))
158 
159  # this program only reacts to events
160  event_thread = EventListener(peer_id, token)
161  event_thread.start()
162 
163  rospy.spin()
164  event_thread.join()
165 
166 
167 if __name__ == "__main__":
168  rospy.init_node("my_node", log_level=rospy.INFO)
169  main()
def create_peer_status_request(peer_id, token)
Definition: peer.py:29
def skyway_event(peer_id, token)
def skyway_control(json_str)
def signal_handler(peer_id, token)
Definition: data_callee.py:14
def delete_peer_request(peer_id, token)
Definition: peer.py:16
def _media_event_react(self, event)
Definition: data_callee.py:122
def _data_event_react(self, event)
Definition: data_callee.py:113
def _peer_event_react(self, event, has_data_config, has_media_config)
Definition: data_callee.py:65
def create_peer_request(peer_id, api_key)
Definition: peer.py:1
def __init__(self, peer_id, token)
Definition: data_callee.py:30
Definition: main.py:1
def redirect_request(data_connection_id, plugin_info)
Definition: data.py:45
def create_data_status_request(data_connection_id)
Definition: data.py:58


skyway
Author(s): Toshiya Nakakura
autogenerated on Sat Apr 15 2023 02:08:21